gstreamer-app: AppSink futures adapter

An adapter for AppSink that adds futures capabilities to it in the form of a Stream.
This commit is contained in:
Valmir Pretto 2020-01-29 10:16:29 -03:00
parent 2dd1bba479
commit c0bab74e68

View file

@ -303,11 +303,20 @@ impl AppSinkStream {
} }
} }
#[cfg(any(feature = "v1_10"))]
impl Drop for AppSinkStream {
fn drop(&mut self) {
self.app_sink.set_callbacks(AppSinkCallbacks::new().build());
}
}
#[cfg(any(feature = "v1_10"))] #[cfg(any(feature = "v1_10"))]
impl Stream for AppSinkStream { impl Stream for AppSinkStream {
type Item = gst::Sample; type Item = gst::Sample;
fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> { fn poll_next(self: Pin<&mut Self>, context: &mut Context) -> Poll<Option<Self::Item>> {
let mut waker = self.waker_reference.lock().unwrap();
self.app_sink self.app_sink
.try_pull_sample(gst::ClockTime::from_mseconds(0)) .try_pull_sample(gst::ClockTime::from_mseconds(0))
.map(|sample| Poll::Ready(Some(sample))) .map(|sample| Poll::Ready(Some(sample)))
@ -316,24 +325,42 @@ impl Stream for AppSinkStream {
return Poll::Ready(None); return Poll::Ready(None);
} }
self.waker_reference waker.replace(context.waker().to_owned());
.lock()
.unwrap()
.replace(context.waker().to_owned());
Poll::Pending Poll::Pending
}) })
} }
} }
#[cfg(any(feature = "v1_10"))]
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use futures_util::StreamExt;
use gst::prelude::*;
#[test] #[test]
fn test_app_sink_stream() { fn test_app_sink_stream() {
gst::init().unwrap(); gst::init().unwrap();
unimplemented!() let videotestsrc = gst::ElementFactory::make("videotestsrc", None).unwrap();
let appsink = gst::ElementFactory::make("appsink", None).unwrap();
videotestsrc.set_property("num-buffers", &5).unwrap();
let pipeline = gst::Pipeline::new(None);
pipeline.add(&videotestsrc).unwrap();
pipeline.add(&appsink).unwrap();
videotestsrc.link(&appsink).unwrap();
let app_sink_stream = appsink.dynamic_cast::<AppSink>().unwrap().stream();
let samples_future = app_sink_stream.collect::<Vec<gst::Sample>>();
pipeline.set_state(gst::State::Playing).unwrap();
let samples = futures_executor::block_on(samples_future);
pipeline.set_state(gst::State::Null).unwrap();
assert_eq!(samples.len(), 5);
} }
} }