diff --git a/gstreamer-base/Cargo.toml b/gstreamer-base/Cargo.toml index 331f8f1fa..c1ecba60f 100644 --- a/gstreamer-base/Cargo.toml +++ b/gstreamer-base/Cargo.toml @@ -20,6 +20,7 @@ bitflags = "1.0" ffi = { package = "gstreamer-base-sys", path = "sys" } glib = { git = "https://github.com/gtk-rs/gtk-rs-core" } gst = { package = "gstreamer", path = "../gstreamer" } +atomic_refcell = "0.1" [dev-dependencies] gir-format-check = "0.1" diff --git a/gstreamer-base/src/subclass/base_src.rs b/gstreamer-base/src/subclass/base_src.rs index ed90d80d9..f8a76ce0a 100644 --- a/gstreamer-base/src/subclass/base_src.rs +++ b/gstreamer-base/src/subclass/base_src.rs @@ -3,17 +3,27 @@ use glib::prelude::*; use glib::translate::*; +use gst::prelude::*; use gst::subclass::prelude::*; use std::mem; use std::ptr; +use atomic_refcell::AtomicRefCell; + +use crate::prelude::BaseSrcExtManual; use crate::BaseSrc; +#[derive(Default)] +pub(super) struct InstanceData { + pub(super) pending_buffer_list: AtomicRefCell>, +} + #[derive(Debug)] pub enum CreateSuccess { FilledBuffer, NewBuffer(gst::Buffer), + NewBufferList(gst::BufferList), } pub trait BaseSrcImpl: BaseSrcImplExt + ElementImpl { @@ -361,14 +371,43 @@ impl BaseSrcImplExt for T { // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys/issues/3 let buffer_ref = &mut buffer_ptr as *mut _ as *mut gst::ffi::GstBuffer; - gst::FlowSuccess::try_from_glib( + let instance_data = self.instance_data::(BaseSrc::static_type()).unwrap(); + + if let Err(err) = gst::FlowSuccess::try_from_glib( f( element.unsafe_cast_ref::().to_glib_none().0, offset, length, buffer_ref, ) - )?; + ) { + *instance_data.pending_buffer_list.borrow_mut() = None; + return Err(err); + } + + let pending_buffer_list = instance_data.pending_buffer_list.borrow_mut().take(); + if pending_buffer_list.is_some() && + (buffer.is_some() || element.unsafe_cast_ref::().src_pad().mode() == gst::PadMode::Pull) { + panic!("Buffer lists can only be returned in push mode"); + } + + if buffer_ptr.is_null() && pending_buffer_list.is_none() { + gst::error!( + gst::CAT_RUST, + obj: element.unsafe_cast_ref::(), + "No buffer and no buffer list returned" + ); + return Err(gst::FlowError::Error); + } + + if !buffer_ptr.is_null() && pending_buffer_list.is_some() { + gst::error!( + gst::CAT_RUST, + obj: element.unsafe_cast_ref::(), + "Both buffer and buffer list returned" + ); + return Err(gst::FlowError::Error); + } if let Some(passed_buffer) = buffer { if buffer_ptr != orig_buffer_ptr { @@ -414,6 +453,8 @@ impl BaseSrcImplExt for T { } else { Ok(CreateSuccess::FilledBuffer) } + } else if let Some(buffer_list) = pending_buffer_list { + Ok(CreateSuccess::NewBufferList(buffer_list)) } else { Ok(CreateSuccess::NewBuffer(from_glib_full(buffer_ptr))) } @@ -631,6 +672,12 @@ unsafe impl IsSubclassable for BaseSrc { klass.unlock_stop = Some(base_src_unlock_stop::); klass.decide_allocation = Some(base_src_decide_allocation::); } + + fn instance_init(instance: &mut glib::subclass::InitializingObject) { + Self::parent_instance_init(instance); + + instance.set_instance_data(BaseSrc::static_type(), InstanceData::default()); + } } unsafe extern "C" fn base_src_start( @@ -788,7 +835,16 @@ unsafe extern "C" fn base_src_create( Some(gst::BufferRef::from_mut_ptr(*buffer_ptr)) }; - gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, { + let instance_data = imp + .instance_data::(BaseSrc::static_type()) + .unwrap(); + + // If there is a pending buffer list at this point then unset it. + if wrap.type_() == T::Type::static_type() { + *instance_data.pending_buffer_list.borrow_mut() = None; + } + + let res = gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, { match imp.create( wrap.unsafe_cast_ref(), offset, @@ -848,11 +904,41 @@ unsafe extern "C" fn base_src_create( gst::FlowReturn::Ok } } + Ok(CreateSuccess::NewBufferList(new_buffer_list)) => { + if buffer.is_some() + || wrap.unsafe_cast_ref::().src_pad().mode() == gst::PadMode::Pull + { + panic!("Buffer lists can only be returned in push mode"); + } + + *buffer_ptr = ptr::null_mut(); + + // If this is the final type then submit the buffer list. This can only be done + // once so can only really be done here. + // FIXME: This won't work if a non-Rust subclass of a Rust subclass is created. + if wrap.type_() == T::Type::static_type() { + ffi::gst_base_src_submit_buffer_list( + wrap.to_glib_none().0, + new_buffer_list.into_glib_ptr(), + ); + } else { + *instance_data.pending_buffer_list.borrow_mut() = Some(new_buffer_list); + } + + gst::FlowReturn::Ok + } Ok(CreateSuccess::FilledBuffer) => gst::FlowReturn::Ok, Err(err) => gst::FlowReturn::from(err), } }) - .into_glib() + .into_glib(); + + // If there is a pending buffer list at this point then unset it. + if wrap.type_() == T::Type::static_type() { + *instance_data.pending_buffer_list.borrow_mut() = None; + } + + res } unsafe extern "C" fn base_src_do_seek( diff --git a/gstreamer-base/src/subclass/push_src.rs b/gstreamer-base/src/subclass/push_src.rs index 9bd54c029..078827bd6 100644 --- a/gstreamer-base/src/subclass/push_src.rs +++ b/gstreamer-base/src/subclass/push_src.rs @@ -3,10 +3,12 @@ use glib::prelude::*; use glib::subclass::prelude::*; use glib::translate::*; +use gst::prelude::*; use std::ptr; use super::base_src::{BaseSrcImpl, CreateSuccess}; +use crate::prelude::BaseSrcExtManual; use crate::PushSrc; pub trait PushSrcImpl: PushSrcImplExt + BaseSrcImpl { @@ -111,13 +113,42 @@ impl PushSrcImplExt for T { // FIXME: Wrong signature in -sys bindings // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys/issues/3 let buffer_ref = &mut buffer_ptr as *mut _ as *mut gst::ffi::GstBuffer; + let instance_data = self.instance_data::(crate::BaseSrc::static_type()).unwrap(); - gst::FlowSuccess::try_from_glib( + if let Err(err) = gst::FlowSuccess::try_from_glib( f( element.unsafe_cast_ref::().to_glib_none().0, buffer_ref, ) - )?; + ) { + *instance_data.pending_buffer_list.borrow_mut() = None; + return Err(err); + } + + let pending_buffer_list = instance_data.pending_buffer_list.borrow_mut().take(); + if pending_buffer_list.is_some() && + (buffer.is_some() || element.unsafe_cast_ref::().src_pad().mode() == gst::PadMode::Pull) { + panic!("Buffer lists can only be returned in push mode"); + } + + let pending_buffer_list = instance_data.pending_buffer_list.borrow_mut().take(); + if buffer_ptr.is_null() && pending_buffer_list.is_none() { + gst::error!( + gst::CAT_RUST, + obj: element.unsafe_cast_ref::(), + "No buffer and no buffer list returned" + ); + return Err(gst::FlowError::Error); + } + + if !buffer_ptr.is_null() && pending_buffer_list.is_some() { + gst::error!( + gst::CAT_RUST, + obj: element.unsafe_cast_ref::(), + "Both buffer and buffer list returned" + ); + return Err(gst::FlowError::Error); + } if let Some(passed_buffer) = buffer { if buffer_ptr != orig_buffer_ptr { @@ -163,6 +194,8 @@ impl PushSrcImplExt for T { } else { Ok(CreateSuccess::FilledBuffer) } + } else if let Some(buffer_list) = pending_buffer_list { + Ok(CreateSuccess::NewBufferList(buffer_list)) } else { Ok(CreateSuccess::NewBuffer(from_glib_full(buffer_ptr))) } @@ -238,9 +271,16 @@ unsafe extern "C" fn push_src_create( Some(gst::BufferRef::from_mut_ptr(*buffer_ptr)) }; - gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, { + let instance_data = imp + .instance_data::(crate::BaseSrc::static_type()) + .unwrap(); + + let res = gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, { match PushSrcImpl::create(imp, wrap.unsafe_cast_ref(), buffer.as_deref_mut()) { Ok(CreateSuccess::NewBuffer(new_buffer)) => { + // Clear any pending buffer list + *instance_data.pending_buffer_list.borrow_mut() = None; + if let Some(passed_buffer) = buffer { if passed_buffer.as_ptr() != new_buffer.as_ptr() { gst::debug!( @@ -293,9 +333,31 @@ unsafe extern "C" fn push_src_create( gst::FlowReturn::Ok } } - Ok(CreateSuccess::FilledBuffer) => gst::FlowReturn::Ok, + Ok(CreateSuccess::NewBufferList(new_buffer_list)) => { + if buffer.is_some() + || wrap.unsafe_cast_ref::().src_pad().mode() == gst::PadMode::Pull + { + panic!("Buffer lists can only be returned in push mode"); + } + + *buffer_ptr = ptr::null_mut(); + + // Store it in the instance data so that in the end base_src_create() can + // submit it. + *instance_data.pending_buffer_list.borrow_mut() = Some(new_buffer_list); + + gst::FlowReturn::Ok + } + Ok(CreateSuccess::FilledBuffer) => { + // Clear any pending buffer list + *instance_data.pending_buffer_list.borrow_mut() = None; + + gst::FlowReturn::Ok + } Err(err) => gst::FlowReturn::from(err), } }) - .into_glib() + .into_glib(); + + res }