forked from mirrors/gstreamer-rs
gstreamer-app: AppSrc futures adapter
An adapter for AppSrc that adds futures capabilities to it in the form of a Sink.
This commit is contained in:
parent
66338881e8
commit
7a05ff52af
3 changed files with 109 additions and 5 deletions
|
@ -28,6 +28,10 @@ gstreamer-base = { version = "0.15", path = "../gstreamer-base" }
|
|||
[build-dependencies]
|
||||
rustdoc-stripper = { version = "0.1", optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
futures-util = { version = "0.3", features = ["sink"] }
|
||||
futures-executor = "0.3"
|
||||
|
||||
[features]
|
||||
default = []
|
||||
v1_10 = ["gstreamer/v1_10", "gstreamer-base/v1_10", "gstreamer-app-sys/v1_10"]
|
||||
|
|
|
@ -15,7 +15,8 @@ use std::cell::RefCell;
|
|||
use std::mem;
|
||||
use std::pin::Pin;
|
||||
use std::ptr;
|
||||
use std::task::{Context, Poll};
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::task::{Context, Poll, Waker};
|
||||
use AppSrc;
|
||||
|
||||
#[allow(clippy::type_complexity)]
|
||||
|
@ -239,6 +240,7 @@ impl AppSrc {
|
|||
#[derive(Debug)]
|
||||
pub struct AppSrcSink {
|
||||
app_src: AppSrc,
|
||||
waker_reference: Arc<Mutex<Option<Waker>>>,
|
||||
}
|
||||
|
||||
impl AppSrcSink {
|
||||
|
@ -246,16 +248,51 @@ impl AppSrcSink {
|
|||
skip_assert_initialized!();
|
||||
|
||||
let app_src = app_src.clone();
|
||||
let waker_reference = Arc::new(Mutex::new(None as Option<Waker>));
|
||||
|
||||
Self { app_src }
|
||||
app_src.set_callbacks(
|
||||
AppSrcCallbacks::new()
|
||||
.need_data({
|
||||
let waker_reference = Arc::clone(&waker_reference);
|
||||
|
||||
move |_, _| {
|
||||
if let Some(waker) = waker_reference.lock().unwrap().take() {
|
||||
waker.wake();
|
||||
}
|
||||
}
|
||||
})
|
||||
.build(),
|
||||
);
|
||||
|
||||
Self {
|
||||
app_src,
|
||||
waker_reference,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for AppSrcSink {
|
||||
fn drop(&mut self) {
|
||||
self.app_src.set_callbacks(AppSrcCallbacks::new().build());
|
||||
}
|
||||
}
|
||||
|
||||
impl Sink<gst::Sample> for AppSrcSink {
|
||||
type Error = gst::FlowError;
|
||||
|
||||
fn poll_ready(self: Pin<&mut Self>, _: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
Poll::Ready(Ok(()))
|
||||
fn poll_ready(self: Pin<&mut Self>, context: &mut Context) -> Poll<Result<(), Self::Error>> {
|
||||
let mut waker = self.waker_reference.lock().unwrap();
|
||||
|
||||
let current_level_bytes = self.app_src.get_current_level_bytes();
|
||||
let max_bytes = self.app_src.get_max_bytes();
|
||||
|
||||
if current_level_bytes >= max_bytes && max_bytes != 0 {
|
||||
waker.replace(context.waker().to_owned());
|
||||
|
||||
Poll::Pending
|
||||
} else {
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
||||
fn start_send(self: Pin<&mut Self>, sample: gst::Sample) -> Result<(), Self::Error> {
|
||||
|
@ -278,11 +315,68 @@ impl Sink<gst::Sample> for AppSrcSink {
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use futures_util::{sink::SinkExt, stream::StreamExt};
|
||||
use gst::prelude::*;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
#[test]
|
||||
fn test_app_src_sink() {
|
||||
gst::init().unwrap();
|
||||
|
||||
unimplemented!()
|
||||
let appsrc = gst::ElementFactory::make("appsrc", None).unwrap();
|
||||
let fakesink = gst::ElementFactory::make("fakesink", None).unwrap();
|
||||
|
||||
fakesink.set_property("signal-handoffs", &true).unwrap();
|
||||
|
||||
let pipeline = gst::Pipeline::new(None);
|
||||
pipeline.add(&appsrc).unwrap();
|
||||
pipeline.add(&fakesink).unwrap();
|
||||
|
||||
appsrc.link(&fakesink).unwrap();
|
||||
|
||||
let mut bus_stream = pipeline.get_bus().unwrap().stream();
|
||||
let mut app_src_sink = appsrc.dynamic_cast::<AppSrc>().unwrap().sink();
|
||||
|
||||
let sample_quantity = 5;
|
||||
|
||||
let samples = (0..sample_quantity)
|
||||
.map(|_| gst::Sample::new().buffer(&gst::Buffer::new()).build())
|
||||
.collect::<Vec<gst::Sample>>();
|
||||
|
||||
let mut sample_stream = futures_util::stream::iter(samples).map(Ok);
|
||||
|
||||
let handoff_count_reference = Arc::new(AtomicUsize::new(0));
|
||||
|
||||
fakesink
|
||||
.connect("handoff", false, {
|
||||
let handoff_count_reference = Arc::clone(&handoff_count_reference);
|
||||
|
||||
move |_| {
|
||||
handoff_count_reference.fetch_add(1, Ordering::AcqRel);
|
||||
|
||||
None
|
||||
}
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
pipeline.set_state(gst::State::Playing).unwrap();
|
||||
|
||||
futures_executor::block_on(app_src_sink.send_all(&mut sample_stream)).unwrap();
|
||||
futures_executor::block_on(app_src_sink.close()).unwrap();
|
||||
|
||||
while let Some(message) = futures_executor::block_on(bus_stream.next()) {
|
||||
match message.view() {
|
||||
gst::MessageView::Eos(_) => break,
|
||||
gst::MessageView::Error(_) => unreachable!(),
|
||||
_ => continue,
|
||||
}
|
||||
}
|
||||
|
||||
pipeline.set_state(gst::State::Null).unwrap();
|
||||
|
||||
assert_eq!(
|
||||
handoff_count_reference.load(Ordering::Acquire),
|
||||
sample_quantity
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,12 @@ extern crate gstreamer_sys as gst_sys;
|
|||
#[macro_use]
|
||||
extern crate glib;
|
||||
|
||||
#[cfg(test)]
|
||||
extern crate futures_util;
|
||||
|
||||
#[cfg(test)]
|
||||
extern crate futures_executor;
|
||||
|
||||
macro_rules! skip_assert_initialized {
|
||||
() => {};
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue