base: Add support for returning buffer lists from BaseSrc/PushSrc subclasses

This commit is contained in:
Sebastian Dröge 2022-06-24 10:08:12 +03:00
parent 5e3ace0ecc
commit 6b47d646c0
3 changed files with 158 additions and 9 deletions

View file

@ -20,6 +20,7 @@ bitflags = "1.0"
ffi = { package = "gstreamer-base-sys", path = "sys" }
glib = { git = "https://github.com/gtk-rs/gtk-rs-core" }
gst = { package = "gstreamer", path = "../gstreamer" }
atomic_refcell = "0.1"
[dev-dependencies]
gir-format-check = "0.1"

View file

@ -3,17 +3,27 @@
use glib::prelude::*;
use glib::translate::*;
use gst::prelude::*;
use gst::subclass::prelude::*;
use std::mem;
use std::ptr;
use atomic_refcell::AtomicRefCell;
use crate::prelude::BaseSrcExtManual;
use crate::BaseSrc;
#[derive(Default)]
pub(super) struct InstanceData {
pub(super) pending_buffer_list: AtomicRefCell<Option<gst::BufferList>>,
}
#[derive(Debug)]
pub enum CreateSuccess {
FilledBuffer,
NewBuffer(gst::Buffer),
NewBufferList(gst::BufferList),
}
pub trait BaseSrcImpl: BaseSrcImplExt + ElementImpl {
@ -361,14 +371,43 @@ impl<T: BaseSrcImpl> BaseSrcImplExt for T {
// https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys/issues/3
let buffer_ref = &mut buffer_ptr as *mut _ as *mut gst::ffi::GstBuffer;
gst::FlowSuccess::try_from_glib(
let instance_data = self.instance_data::<InstanceData>(BaseSrc::static_type()).unwrap();
if let Err(err) = gst::FlowSuccess::try_from_glib(
f(
element.unsafe_cast_ref::<BaseSrc>().to_glib_none().0,
offset,
length,
buffer_ref,
)
)?;
) {
*instance_data.pending_buffer_list.borrow_mut() = None;
return Err(err);
}
let pending_buffer_list = instance_data.pending_buffer_list.borrow_mut().take();
if pending_buffer_list.is_some() &&
(buffer.is_some() || element.unsafe_cast_ref::<BaseSrc>().src_pad().mode() == gst::PadMode::Pull) {
panic!("Buffer lists can only be returned in push mode");
}
if buffer_ptr.is_null() && pending_buffer_list.is_none() {
gst::error!(
gst::CAT_RUST,
obj: element.unsafe_cast_ref::<BaseSrc>(),
"No buffer and no buffer list returned"
);
return Err(gst::FlowError::Error);
}
if !buffer_ptr.is_null() && pending_buffer_list.is_some() {
gst::error!(
gst::CAT_RUST,
obj: element.unsafe_cast_ref::<BaseSrc>(),
"Both buffer and buffer list returned"
);
return Err(gst::FlowError::Error);
}
if let Some(passed_buffer) = buffer {
if buffer_ptr != orig_buffer_ptr {
@ -414,6 +453,8 @@ impl<T: BaseSrcImpl> BaseSrcImplExt for T {
} else {
Ok(CreateSuccess::FilledBuffer)
}
} else if let Some(buffer_list) = pending_buffer_list {
Ok(CreateSuccess::NewBufferList(buffer_list))
} else {
Ok(CreateSuccess::NewBuffer(from_glib_full(buffer_ptr)))
}
@ -631,6 +672,12 @@ unsafe impl<T: BaseSrcImpl> IsSubclassable<T> for BaseSrc {
klass.unlock_stop = Some(base_src_unlock_stop::<T>);
klass.decide_allocation = Some(base_src_decide_allocation::<T>);
}
fn instance_init(instance: &mut glib::subclass::InitializingObject<T>) {
Self::parent_instance_init(instance);
instance.set_instance_data(BaseSrc::static_type(), InstanceData::default());
}
}
unsafe extern "C" fn base_src_start<T: BaseSrcImpl>(
@ -788,7 +835,16 @@ unsafe extern "C" fn base_src_create<T: BaseSrcImpl>(
Some(gst::BufferRef::from_mut_ptr(*buffer_ptr))
};
gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, {
let instance_data = imp
.instance_data::<InstanceData>(BaseSrc::static_type())
.unwrap();
// If there is a pending buffer list at this point then unset it.
if wrap.type_() == T::Type::static_type() {
*instance_data.pending_buffer_list.borrow_mut() = None;
}
let res = gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, {
match imp.create(
wrap.unsafe_cast_ref(),
offset,
@ -848,11 +904,41 @@ unsafe extern "C" fn base_src_create<T: BaseSrcImpl>(
gst::FlowReturn::Ok
}
}
Ok(CreateSuccess::NewBufferList(new_buffer_list)) => {
if buffer.is_some()
|| wrap.unsafe_cast_ref::<BaseSrc>().src_pad().mode() == gst::PadMode::Pull
{
panic!("Buffer lists can only be returned in push mode");
}
*buffer_ptr = ptr::null_mut();
// If this is the final type then submit the buffer list. This can only be done
// once so can only really be done here.
// FIXME: This won't work if a non-Rust subclass of a Rust subclass is created.
if wrap.type_() == T::Type::static_type() {
ffi::gst_base_src_submit_buffer_list(
wrap.to_glib_none().0,
new_buffer_list.into_glib_ptr(),
);
} else {
*instance_data.pending_buffer_list.borrow_mut() = Some(new_buffer_list);
}
gst::FlowReturn::Ok
}
Ok(CreateSuccess::FilledBuffer) => gst::FlowReturn::Ok,
Err(err) => gst::FlowReturn::from(err),
}
})
.into_glib()
.into_glib();
// If there is a pending buffer list at this point then unset it.
if wrap.type_() == T::Type::static_type() {
*instance_data.pending_buffer_list.borrow_mut() = None;
}
res
}
unsafe extern "C" fn base_src_do_seek<T: BaseSrcImpl>(

View file

@ -3,10 +3,12 @@
use glib::prelude::*;
use glib::subclass::prelude::*;
use glib::translate::*;
use gst::prelude::*;
use std::ptr;
use super::base_src::{BaseSrcImpl, CreateSuccess};
use crate::prelude::BaseSrcExtManual;
use crate::PushSrc;
pub trait PushSrcImpl: PushSrcImplExt + BaseSrcImpl {
@ -111,13 +113,42 @@ impl<T: PushSrcImpl> PushSrcImplExt for T {
// FIXME: Wrong signature in -sys bindings
// https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys/issues/3
let buffer_ref = &mut buffer_ptr as *mut _ as *mut gst::ffi::GstBuffer;
let instance_data = self.instance_data::<super::base_src::InstanceData>(crate::BaseSrc::static_type()).unwrap();
gst::FlowSuccess::try_from_glib(
if let Err(err) = gst::FlowSuccess::try_from_glib(
f(
element.unsafe_cast_ref::<PushSrc>().to_glib_none().0,
buffer_ref,
)
)?;
) {
*instance_data.pending_buffer_list.borrow_mut() = None;
return Err(err);
}
let pending_buffer_list = instance_data.pending_buffer_list.borrow_mut().take();
if pending_buffer_list.is_some() &&
(buffer.is_some() || element.unsafe_cast_ref::<PushSrc>().src_pad().mode() == gst::PadMode::Pull) {
panic!("Buffer lists can only be returned in push mode");
}
let pending_buffer_list = instance_data.pending_buffer_list.borrow_mut().take();
if buffer_ptr.is_null() && pending_buffer_list.is_none() {
gst::error!(
gst::CAT_RUST,
obj: element.unsafe_cast_ref::<PushSrc>(),
"No buffer and no buffer list returned"
);
return Err(gst::FlowError::Error);
}
if !buffer_ptr.is_null() && pending_buffer_list.is_some() {
gst::error!(
gst::CAT_RUST,
obj: element.unsafe_cast_ref::<PushSrc>(),
"Both buffer and buffer list returned"
);
return Err(gst::FlowError::Error);
}
if let Some(passed_buffer) = buffer {
if buffer_ptr != orig_buffer_ptr {
@ -163,6 +194,8 @@ impl<T: PushSrcImpl> PushSrcImplExt for T {
} else {
Ok(CreateSuccess::FilledBuffer)
}
} else if let Some(buffer_list) = pending_buffer_list {
Ok(CreateSuccess::NewBufferList(buffer_list))
} else {
Ok(CreateSuccess::NewBuffer(from_glib_full(buffer_ptr)))
}
@ -238,9 +271,16 @@ unsafe extern "C" fn push_src_create<T: PushSrcImpl>(
Some(gst::BufferRef::from_mut_ptr(*buffer_ptr))
};
gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, {
let instance_data = imp
.instance_data::<super::base_src::InstanceData>(crate::BaseSrc::static_type())
.unwrap();
let res = gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, {
match PushSrcImpl::create(imp, wrap.unsafe_cast_ref(), buffer.as_deref_mut()) {
Ok(CreateSuccess::NewBuffer(new_buffer)) => {
// Clear any pending buffer list
*instance_data.pending_buffer_list.borrow_mut() = None;
if let Some(passed_buffer) = buffer {
if passed_buffer.as_ptr() != new_buffer.as_ptr() {
gst::debug!(
@ -293,9 +333,31 @@ unsafe extern "C" fn push_src_create<T: PushSrcImpl>(
gst::FlowReturn::Ok
}
}
Ok(CreateSuccess::FilledBuffer) => gst::FlowReturn::Ok,
Ok(CreateSuccess::NewBufferList(new_buffer_list)) => {
if buffer.is_some()
|| wrap.unsafe_cast_ref::<PushSrc>().src_pad().mode() == gst::PadMode::Pull
{
panic!("Buffer lists can only be returned in push mode");
}
*buffer_ptr = ptr::null_mut();
// Store it in the instance data so that in the end base_src_create() can
// submit it.
*instance_data.pending_buffer_list.borrow_mut() = Some(new_buffer_list);
gst::FlowReturn::Ok
}
Ok(CreateSuccess::FilledBuffer) => {
// Clear any pending buffer list
*instance_data.pending_buffer_list.borrow_mut() = None;
gst::FlowReturn::Ok
}
Err(err) => gst::FlowReturn::from(err),
}
})
.into_glib()
.into_glib();
res
}