gstreamer-app: AppSrc futures adapter

An adapter for AppSrc that adds futures capabilities to it in the form of a Sink.
This commit is contained in:
Valmir Pretto 2020-01-29 12:51:47 -03:00
parent 6263922b6d
commit 2dd1bba479
3 changed files with 109 additions and 5 deletions

View file

@ -28,6 +28,10 @@ gstreamer-base = { path = "../gstreamer-base" }
[build-dependencies] [build-dependencies]
rustdoc-stripper = { version = "0.1", optional = true } rustdoc-stripper = { version = "0.1", optional = true }
[dev-dependencies]
futures-util = { version = "0.3", features = ["sink"] }
futures-executor = "0.3"
[features] [features]
default = [] default = []
v1_10 = ["gstreamer/v1_10", "gstreamer-base/v1_10", "gstreamer-app-sys/v1_10"] v1_10 = ["gstreamer/v1_10", "gstreamer-base/v1_10", "gstreamer-app-sys/v1_10"]

View file

@ -15,7 +15,8 @@ use std::cell::RefCell;
use std::mem; use std::mem;
use std::pin::Pin; use std::pin::Pin;
use std::ptr; use std::ptr;
use std::task::{Context, Poll}; use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use AppSrc; use AppSrc;
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
@ -239,6 +240,7 @@ impl AppSrc {
#[derive(Debug)] #[derive(Debug)]
pub struct AppSrcSink { pub struct AppSrcSink {
app_src: AppSrc, app_src: AppSrc,
waker_reference: Arc<Mutex<Option<Waker>>>,
} }
impl AppSrcSink { impl AppSrcSink {
@ -246,16 +248,51 @@ impl AppSrcSink {
skip_assert_initialized!(); skip_assert_initialized!();
let app_src = app_src.clone(); let app_src = app_src.clone();
let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
Self { app_src } app_src.set_callbacks(
AppSrcCallbacks::new()
.need_data({
let waker_reference = Arc::clone(&waker_reference);
move |_, _| {
if let Some(waker) = waker_reference.lock().unwrap().take() {
waker.wake();
}
}
})
.build(),
);
Self {
app_src,
waker_reference,
}
}
}
impl Drop for AppSrcSink {
fn drop(&mut self) {
self.app_src.set_callbacks(AppSrcCallbacks::new().build());
} }
} }
impl Sink<gst::Sample> for AppSrcSink { impl Sink<gst::Sample> for AppSrcSink {
type Error = gst::FlowError; type Error = gst::FlowError;
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> { fn poll_ready(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(())) let mut waker = self.waker_reference.lock().unwrap();
let current_level_bytes = self.app_src.get_current_level_bytes();
let max_bytes = self.app_src.get_max_bytes();
if current_level_bytes >= max_bytes && max_bytes != 0 {
waker.replace(context.waker().to_owned());
Poll::Pending
} else {
Poll::Ready(Ok(()))
}
} }
fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> { fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> {
@ -278,11 +315,68 @@ impl Sink<gst::Sample> for AppSrcSink {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use futures_util::{sink::SinkExt, stream::StreamExt};
use gst::prelude::*;
use std::sync::atomic::{AtomicUsize, Ordering};
#[test] #[test]
fn test_app_src_sink() { fn test_app_src_sink() {
gst::init().unwrap(); gst::init().unwrap();
unimplemented!() let appsrc = gst::ElementFactory::make("appsrc", None).unwrap();
let fakesink = gst::ElementFactory::make("fakesink", None).unwrap();
fakesink.set_property("signal-handoffs", &true).unwrap();
let pipeline = gst::Pipeline::new(None);
pipeline.add(&appsrc).unwrap();
pipeline.add(&fakesink).unwrap();
appsrc.link(&fakesink).unwrap();
let mut bus_stream = pipeline.get_bus().unwrap().stream();
let mut app_src_sink = appsrc.dynamic_cast::<AppSrc>().unwrap().sink();
let sample_quantity = 5;
let samples = (0..sample_quantity)
.map(|_| gst::Sample::new().buffer(&gst::Buffer::new()).build())
.collect::<Vec<gst::Sample>>();
let mut sample_stream = futures_util::stream::iter(samples).map(Ok);
let handoff_count_reference = Arc::new(AtomicUsize::new(0));
fakesink
.connect("handoff", false, {
let handoff_count_reference = Arc::clone(&handoff_count_reference);
move |_| {
handoff_count_reference.fetch_add(1, Ordering::AcqRel);
None
}
})
.unwrap();
pipeline.set_state(gst::State::Playing).unwrap();
futures_executor::block_on(app_src_sink.send_all(&mut sample_stream)).unwrap();
futures_executor::block_on(app_src_sink.close()).unwrap();
while let Some(message) = futures_executor::block_on(bus_stream.next()) {
match message.view() {
gst::MessageView::Eos(_) => break,
gst::MessageView::Error(_) => unreachable!(),
_ => continue,
}
}
pipeline.set_state(gst::State::Null).unwrap();
assert_eq!(
handoff_count_reference.load(Ordering::Acquire),
sample_quantity
);
} }
} }

View file

@ -20,6 +20,12 @@ extern crate gstreamer_sys as gst_sys;
#[macro_use] #[macro_use]
extern crate glib; extern crate glib;
#[cfg(test)]
extern crate futures_util;
#[cfg(test)]
extern crate futures_executor;
macro_rules! skip_assert_initialized { macro_rules! skip_assert_initialized {
() => {}; () => {};
} }