base: Add support for returning buffer lists from BaseSrc/PushSrc subclasses

This commit is contained in:
Sebastian Dröge 2022-06-24 10:08:12 +03:00
parent c8e8031b74
commit 4cefe838f1
3 changed files with 178 additions and 9 deletions

View file

@ -20,6 +20,7 @@ bitflags = "1.0"
ffi = { package = "gstreamer-base-sys", version = "0.18", path = "sys", features = ["v1_8"] } ffi = { package = "gstreamer-base-sys", version = "0.18", path = "sys", features = ["v1_8"] }
glib = { git = "https://github.com/gtk-rs/gtk-rs-core", branch = "0.15", version = "0.15" } glib = { git = "https://github.com/gtk-rs/gtk-rs-core", branch = "0.15", version = "0.15" }
gst = { package = "gstreamer", version = "0.18", path = "../gstreamer" } gst = { package = "gstreamer", version = "0.18", path = "../gstreamer" }
atomic_refcell = "0.1"
[dev-dependencies] [dev-dependencies]
gir-format-check = "0.1" gir-format-check = "0.1"

View file

@ -3,18 +3,30 @@
use glib::prelude::*; use glib::prelude::*;
use glib::translate::*; use glib::translate::*;
use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_debug, gst_error}; use gst::{gst_debug, gst_error};
use std::mem; use std::mem;
use std::ptr; use std::ptr;
use atomic_refcell::AtomicRefCell;
use crate::prelude::BaseSrcExtManual;
use crate::BaseSrc; use crate::BaseSrc;
#[derive(Default)]
pub(super) struct InstanceData {
pub(super) pending_buffer_list: AtomicRefCell<Option<gst::BufferList>>,
}
#[derive(Debug)] #[derive(Debug)]
pub enum CreateSuccess { pub enum CreateSuccess {
FilledBuffer, FilledBuffer,
NewBuffer(gst::Buffer), NewBuffer(gst::Buffer),
#[cfg(any(feature = "v1_14", feature = "dox"))]
#[cfg_attr(feature = "dox", doc(cfg(feature = "v1_14")))]
NewBufferList(gst::BufferList),
} }
pub trait BaseSrcImpl: BaseSrcImplExt + ElementImpl { pub trait BaseSrcImpl: BaseSrcImplExt + ElementImpl {
@ -362,14 +374,43 @@ impl<T: BaseSrcImpl> BaseSrcImplExt for T {
// https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys/issues/3 // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys/issues/3
let buffer_ref = &mut buffer_ptr as *mut _ as *mut gst::ffi::GstBuffer; let buffer_ref = &mut buffer_ptr as *mut _ as *mut gst::ffi::GstBuffer;
gst::FlowSuccess::try_from_glib( let instance_data = self.instance_data::<InstanceData>(BaseSrc::static_type()).unwrap();
if let Err(err) = gst::FlowSuccess::try_from_glib(
f( f(
element.unsafe_cast_ref::<BaseSrc>().to_glib_none().0, element.unsafe_cast_ref::<BaseSrc>().to_glib_none().0,
offset, offset,
length, length,
buffer_ref, buffer_ref,
) )
)?; ) {
*instance_data.pending_buffer_list.borrow_mut() = None;
return Err(err);
}
let pending_buffer_list = instance_data.pending_buffer_list.borrow_mut().take();
if pending_buffer_list.is_some() &&
(buffer.is_some() || element.unsafe_cast_ref::<BaseSrc>().src_pad().mode() == gst::PadMode::Pull) {
panic!("Buffer lists can only be returned in push mode");
}
if buffer_ptr.is_null() && pending_buffer_list.is_none() {
gst_error!(
gst::CAT_RUST,
obj: element.unsafe_cast_ref::<BaseSrc>(),
"No buffer and no buffer list returned"
);
return Err(gst::FlowError::Error);
}
if !buffer_ptr.is_null() && pending_buffer_list.is_some() {
gst_error!(
gst::CAT_RUST,
obj: element.unsafe_cast_ref::<BaseSrc>(),
"Both buffer and buffer list returned"
);
return Err(gst::FlowError::Error);
}
if let Some(passed_buffer) = buffer { if let Some(passed_buffer) = buffer {
if buffer_ptr != orig_buffer_ptr { if buffer_ptr != orig_buffer_ptr {
@ -415,6 +456,16 @@ impl<T: BaseSrcImpl> BaseSrcImplExt for T {
} else { } else {
Ok(CreateSuccess::FilledBuffer) Ok(CreateSuccess::FilledBuffer)
} }
} else if let Some(buffer_list) = pending_buffer_list {
#[cfg(feature = "v1_14")]
{
Ok(CreateSuccess::NewBufferList(buffer_list))
}
#[cfg(not(feature = "v1_14"))]
{
let _ = buffer_list;
unreachable!()
}
} else { } else {
Ok(CreateSuccess::NewBuffer(from_glib_full(buffer_ptr))) Ok(CreateSuccess::NewBuffer(from_glib_full(buffer_ptr)))
} }
@ -632,6 +683,12 @@ unsafe impl<T: BaseSrcImpl> IsSubclassable<T> for BaseSrc {
klass.unlock_stop = Some(base_src_unlock_stop::<T>); klass.unlock_stop = Some(base_src_unlock_stop::<T>);
klass.decide_allocation = Some(base_src_decide_allocation::<T>); klass.decide_allocation = Some(base_src_decide_allocation::<T>);
} }
fn instance_init(instance: &mut glib::subclass::InitializingObject<T>) {
Self::parent_instance_init(instance);
instance.set_instance_data(BaseSrc::static_type(), InstanceData::default());
}
} }
unsafe extern "C" fn base_src_start<T: BaseSrcImpl>( unsafe extern "C" fn base_src_start<T: BaseSrcImpl>(
@ -789,7 +846,16 @@ unsafe extern "C" fn base_src_create<T: BaseSrcImpl>(
Some(gst::BufferRef::from_mut_ptr(*buffer_ptr)) Some(gst::BufferRef::from_mut_ptr(*buffer_ptr))
}; };
gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, { let instance_data = imp
.instance_data::<InstanceData>(BaseSrc::static_type())
.unwrap();
// If there is a pending buffer list at this point then unset it.
if wrap.type_() == T::Type::static_type() {
*instance_data.pending_buffer_list.borrow_mut() = None;
}
let res = gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, {
match imp.create( match imp.create(
wrap.unsafe_cast_ref(), wrap.unsafe_cast_ref(),
offset, offset,
@ -849,11 +915,42 @@ unsafe extern "C" fn base_src_create<T: BaseSrcImpl>(
gst::FlowReturn::Ok gst::FlowReturn::Ok
} }
} }
#[cfg(any(feature = "v1_14", feature = "dox"))]
Ok(CreateSuccess::NewBufferList(new_buffer_list)) => {
if buffer.is_some()
|| wrap.unsafe_cast_ref::<BaseSrc>().src_pad().mode() == gst::PadMode::Pull
{
panic!("Buffer lists can only be returned in push mode");
}
*buffer_ptr = ptr::null_mut();
// If this is the final type then submit the buffer list. This can only be done
// once so can only really be done here.
// FIXME: This won't work if a non-Rust subclass of a Rust subclass is created.
if wrap.type_() == T::Type::static_type() {
ffi::gst_base_src_submit_buffer_list(
wrap.to_glib_none().0,
new_buffer_list.into_ptr(),
);
} else {
*instance_data.pending_buffer_list.borrow_mut() = Some(new_buffer_list);
}
gst::FlowReturn::Ok
}
Ok(CreateSuccess::FilledBuffer) => gst::FlowReturn::Ok, Ok(CreateSuccess::FilledBuffer) => gst::FlowReturn::Ok,
Err(err) => gst::FlowReturn::from(err), Err(err) => gst::FlowReturn::from(err),
} }
}) })
.into_glib() .into_glib();
// If there is a pending buffer list at this point then unset it.
if wrap.type_() == T::Type::static_type() {
*instance_data.pending_buffer_list.borrow_mut() = None;
}
res
} }
unsafe extern "C" fn base_src_do_seek<T: BaseSrcImpl>( unsafe extern "C" fn base_src_do_seek<T: BaseSrcImpl>(

View file

@ -3,12 +3,14 @@
use glib::prelude::*; use glib::prelude::*;
use glib::subclass::prelude::*; use glib::subclass::prelude::*;
use glib::translate::*; use glib::translate::*;
use gst::prelude::*;
use gst::{gst_debug, gst_error}; use gst::{gst_debug, gst_error};
use std::ptr; use std::ptr;
use super::base_src::{BaseSrcImpl, CreateSuccess}; use super::base_src::{BaseSrcImpl, CreateSuccess};
use crate::prelude::BaseSrcExtManual;
use crate::PushSrc; use crate::PushSrc;
pub trait PushSrcImpl: PushSrcImplExt + BaseSrcImpl { pub trait PushSrcImpl: PushSrcImplExt + BaseSrcImpl {
@ -113,13 +115,42 @@ impl<T: PushSrcImpl> PushSrcImplExt for T {
// FIXME: Wrong signature in -sys bindings // FIXME: Wrong signature in -sys bindings
// https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys/issues/3 // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys/issues/3
let buffer_ref = &mut buffer_ptr as *mut _ as *mut gst::ffi::GstBuffer; let buffer_ref = &mut buffer_ptr as *mut _ as *mut gst::ffi::GstBuffer;
let instance_data = self.instance_data::<super::base_src::InstanceData>(crate::BaseSrc::static_type()).unwrap();
gst::FlowSuccess::try_from_glib( if let Err(err) = gst::FlowSuccess::try_from_glib(
f( f(
element.unsafe_cast_ref::<PushSrc>().to_glib_none().0, element.unsafe_cast_ref::<PushSrc>().to_glib_none().0,
buffer_ref, buffer_ref,
) )
)?; ) {
*instance_data.pending_buffer_list.borrow_mut() = None;
return Err(err);
}
let pending_buffer_list = instance_data.pending_buffer_list.borrow_mut().take();
if pending_buffer_list.is_some() &&
(buffer.is_some() || element.unsafe_cast_ref::<PushSrc>().src_pad().mode() == gst::PadMode::Pull) {
panic!("Buffer lists can only be returned in push mode");
}
let pending_buffer_list = instance_data.pending_buffer_list.borrow_mut().take();
if buffer_ptr.is_null() && pending_buffer_list.is_none() {
gst_error!(
gst::CAT_RUST,
obj: element.unsafe_cast_ref::<PushSrc>(),
"No buffer and no buffer list returned"
);
return Err(gst::FlowError::Error);
}
if !buffer_ptr.is_null() && pending_buffer_list.is_some() {
gst_error!(
gst::CAT_RUST,
obj: element.unsafe_cast_ref::<PushSrc>(),
"Both buffer and buffer list returned"
);
return Err(gst::FlowError::Error);
}
if let Some(passed_buffer) = buffer { if let Some(passed_buffer) = buffer {
if buffer_ptr != orig_buffer_ptr { if buffer_ptr != orig_buffer_ptr {
@ -165,6 +196,16 @@ impl<T: PushSrcImpl> PushSrcImplExt for T {
} else { } else {
Ok(CreateSuccess::FilledBuffer) Ok(CreateSuccess::FilledBuffer)
} }
} else if let Some(buffer_list) = pending_buffer_list {
#[cfg(feature = "v1_14")]
{
Ok(CreateSuccess::NewBufferList(buffer_list))
}
#[cfg(not(feature = "v1_14"))]
{
let _ = buffer_list;
unreachable!()
}
} else { } else {
Ok(CreateSuccess::NewBuffer(from_glib_full(buffer_ptr))) Ok(CreateSuccess::NewBuffer(from_glib_full(buffer_ptr)))
} }
@ -240,9 +281,16 @@ unsafe extern "C" fn push_src_create<T: PushSrcImpl>(
Some(gst::BufferRef::from_mut_ptr(*buffer_ptr)) Some(gst::BufferRef::from_mut_ptr(*buffer_ptr))
}; };
gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, { let instance_data = imp
.instance_data::<super::base_src::InstanceData>(crate::BaseSrc::static_type())
.unwrap();
let res = gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, {
match PushSrcImpl::create(imp, wrap.unsafe_cast_ref(), buffer.as_deref_mut()) { match PushSrcImpl::create(imp, wrap.unsafe_cast_ref(), buffer.as_deref_mut()) {
Ok(CreateSuccess::NewBuffer(new_buffer)) => { Ok(CreateSuccess::NewBuffer(new_buffer)) => {
// Clear any pending buffer list
*instance_data.pending_buffer_list.borrow_mut() = None;
if let Some(passed_buffer) = buffer { if let Some(passed_buffer) = buffer {
if passed_buffer.as_ptr() != new_buffer.as_ptr() { if passed_buffer.as_ptr() != new_buffer.as_ptr() {
gst_debug!( gst_debug!(
@ -295,9 +343,32 @@ unsafe extern "C" fn push_src_create<T: PushSrcImpl>(
gst::FlowReturn::Ok gst::FlowReturn::Ok
} }
} }
Ok(CreateSuccess::FilledBuffer) => gst::FlowReturn::Ok, #[cfg(any(feature = "v1_14", feature = "dox"))]
Ok(CreateSuccess::NewBufferList(new_buffer_list)) => {
if buffer.is_some()
|| wrap.unsafe_cast_ref::<PushSrc>().src_pad().mode() == gst::PadMode::Pull
{
panic!("Buffer lists can only be returned in push mode");
}
*buffer_ptr = ptr::null_mut();
// Store it in the instance data so that in the end base_src_create() can
// submit it.
*instance_data.pending_buffer_list.borrow_mut() = Some(new_buffer_list);
gst::FlowReturn::Ok
}
Ok(CreateSuccess::FilledBuffer) => {
// Clear any pending buffer list
*instance_data.pending_buffer_list.borrow_mut() = None;
gst::FlowReturn::Ok
}
Err(err) => gst::FlowReturn::from(err), Err(err) => gst::FlowReturn::from(err),
} }
}) })
.into_glib() .into_glib();
res
} }