diff --git a/gstreamer-base/Cargo.toml b/gstreamer-base/Cargo.toml index cb64fd941..5c8394e83 100644 --- a/gstreamer-base/Cargo.toml +++ b/gstreamer-base/Cargo.toml @@ -20,6 +20,7 @@ bitflags = "1.0" ffi = { package = "gstreamer-base-sys", version = "0.18", path = "sys", features = ["v1_8"] } glib = { git = "https://github.com/gtk-rs/gtk-rs-core", branch = "0.15", version = "0.15" } gst = { package = "gstreamer", version = "0.18", path = "../gstreamer" } +atomic_refcell = "0.1" [dev-dependencies] gir-format-check = "0.1" diff --git a/gstreamer-base/src/subclass/base_src.rs b/gstreamer-base/src/subclass/base_src.rs index f58657539..896066044 100644 --- a/gstreamer-base/src/subclass/base_src.rs +++ b/gstreamer-base/src/subclass/base_src.rs @@ -3,18 +3,30 @@ use glib::prelude::*; use glib::translate::*; +use gst::prelude::*; use gst::subclass::prelude::*; use gst::{gst_debug, gst_error}; use std::mem; use std::ptr; +use atomic_refcell::AtomicRefCell; + +use crate::prelude::BaseSrcExtManual; use crate::BaseSrc; +#[derive(Default)] +pub(super) struct InstanceData { + pub(super) pending_buffer_list: AtomicRefCell>, +} + #[derive(Debug)] pub enum CreateSuccess { FilledBuffer, NewBuffer(gst::Buffer), + #[cfg(any(feature = "v1_14", feature = "dox"))] + #[cfg_attr(feature = "dox", doc(cfg(feature = "v1_14")))] + NewBufferList(gst::BufferList), } pub trait BaseSrcImpl: BaseSrcImplExt + ElementImpl { @@ -362,14 +374,43 @@ impl BaseSrcImplExt for T { // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys/issues/3 let buffer_ref = &mut buffer_ptr as *mut _ as *mut gst::ffi::GstBuffer; - gst::FlowSuccess::try_from_glib( + let instance_data = self.instance_data::(BaseSrc::static_type()).unwrap(); + + if let Err(err) = gst::FlowSuccess::try_from_glib( f( element.unsafe_cast_ref::().to_glib_none().0, offset, length, buffer_ref, ) - )?; + ) { + *instance_data.pending_buffer_list.borrow_mut() = None; + return Err(err); + } + + let pending_buffer_list = instance_data.pending_buffer_list.borrow_mut().take(); + if pending_buffer_list.is_some() && + (buffer.is_some() || element.unsafe_cast_ref::().src_pad().mode() == gst::PadMode::Pull) { + panic!("Buffer lists can only be returned in push mode"); + } + + if buffer_ptr.is_null() && pending_buffer_list.is_none() { + gst_error!( + gst::CAT_RUST, + obj: element.unsafe_cast_ref::(), + "No buffer and no buffer list returned" + ); + return Err(gst::FlowError::Error); + } + + if !buffer_ptr.is_null() && pending_buffer_list.is_some() { + gst_error!( + gst::CAT_RUST, + obj: element.unsafe_cast_ref::(), + "Both buffer and buffer list returned" + ); + return Err(gst::FlowError::Error); + } if let Some(passed_buffer) = buffer { if buffer_ptr != orig_buffer_ptr { @@ -415,6 +456,16 @@ impl BaseSrcImplExt for T { } else { Ok(CreateSuccess::FilledBuffer) } + } else if let Some(buffer_list) = pending_buffer_list { + #[cfg(feature = "v1_14")] + { + Ok(CreateSuccess::NewBufferList(buffer_list)) + } + #[cfg(not(feature = "v1_14"))] + { + let _ = buffer_list; + unreachable!() + } } else { Ok(CreateSuccess::NewBuffer(from_glib_full(buffer_ptr))) } @@ -632,6 +683,12 @@ unsafe impl IsSubclassable for BaseSrc { klass.unlock_stop = Some(base_src_unlock_stop::); klass.decide_allocation = Some(base_src_decide_allocation::); } + + fn instance_init(instance: &mut glib::subclass::InitializingObject) { + Self::parent_instance_init(instance); + + instance.set_instance_data(BaseSrc::static_type(), InstanceData::default()); + } } unsafe extern "C" fn base_src_start( @@ -789,7 +846,16 @@ unsafe extern "C" fn base_src_create( Some(gst::BufferRef::from_mut_ptr(*buffer_ptr)) }; - gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, { + let instance_data = imp + .instance_data::(BaseSrc::static_type()) + .unwrap(); + + // If there is a pending buffer list at this point then unset it. + if wrap.type_() == T::Type::static_type() { + *instance_data.pending_buffer_list.borrow_mut() = None; + } + + let res = gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, { match imp.create( wrap.unsafe_cast_ref(), offset, @@ -849,11 +915,42 @@ unsafe extern "C" fn base_src_create( gst::FlowReturn::Ok } } + #[cfg(any(feature = "v1_14", feature = "dox"))] + Ok(CreateSuccess::NewBufferList(new_buffer_list)) => { + if buffer.is_some() + || wrap.unsafe_cast_ref::().src_pad().mode() == gst::PadMode::Pull + { + panic!("Buffer lists can only be returned in push mode"); + } + + *buffer_ptr = ptr::null_mut(); + + // If this is the final type then submit the buffer list. This can only be done + // once so can only really be done here. + // FIXME: This won't work if a non-Rust subclass of a Rust subclass is created. + if wrap.type_() == T::Type::static_type() { + ffi::gst_base_src_submit_buffer_list( + wrap.to_glib_none().0, + new_buffer_list.into_ptr(), + ); + } else { + *instance_data.pending_buffer_list.borrow_mut() = Some(new_buffer_list); + } + + gst::FlowReturn::Ok + } Ok(CreateSuccess::FilledBuffer) => gst::FlowReturn::Ok, Err(err) => gst::FlowReturn::from(err), } }) - .into_glib() + .into_glib(); + + // If there is a pending buffer list at this point then unset it. + if wrap.type_() == T::Type::static_type() { + *instance_data.pending_buffer_list.borrow_mut() = None; + } + + res } unsafe extern "C" fn base_src_do_seek( diff --git a/gstreamer-base/src/subclass/push_src.rs b/gstreamer-base/src/subclass/push_src.rs index fe9b7c3f4..3aefeab9d 100644 --- a/gstreamer-base/src/subclass/push_src.rs +++ b/gstreamer-base/src/subclass/push_src.rs @@ -3,12 +3,14 @@ use glib::prelude::*; use glib::subclass::prelude::*; use glib::translate::*; +use gst::prelude::*; use gst::{gst_debug, gst_error}; use std::ptr; use super::base_src::{BaseSrcImpl, CreateSuccess}; +use crate::prelude::BaseSrcExtManual; use crate::PushSrc; pub trait PushSrcImpl: PushSrcImplExt + BaseSrcImpl { @@ -113,13 +115,42 @@ impl PushSrcImplExt for T { // FIXME: Wrong signature in -sys bindings // https://gitlab.freedesktop.org/gstreamer/gstreamer-rs-sys/issues/3 let buffer_ref = &mut buffer_ptr as *mut _ as *mut gst::ffi::GstBuffer; + let instance_data = self.instance_data::(crate::BaseSrc::static_type()).unwrap(); - gst::FlowSuccess::try_from_glib( + if let Err(err) = gst::FlowSuccess::try_from_glib( f( element.unsafe_cast_ref::().to_glib_none().0, buffer_ref, ) - )?; + ) { + *instance_data.pending_buffer_list.borrow_mut() = None; + return Err(err); + } + + let pending_buffer_list = instance_data.pending_buffer_list.borrow_mut().take(); + if pending_buffer_list.is_some() && + (buffer.is_some() || element.unsafe_cast_ref::().src_pad().mode() == gst::PadMode::Pull) { + panic!("Buffer lists can only be returned in push mode"); + } + + let pending_buffer_list = instance_data.pending_buffer_list.borrow_mut().take(); + if buffer_ptr.is_null() && pending_buffer_list.is_none() { + gst_error!( + gst::CAT_RUST, + obj: element.unsafe_cast_ref::(), + "No buffer and no buffer list returned" + ); + return Err(gst::FlowError::Error); + } + + if !buffer_ptr.is_null() && pending_buffer_list.is_some() { + gst_error!( + gst::CAT_RUST, + obj: element.unsafe_cast_ref::(), + "Both buffer and buffer list returned" + ); + return Err(gst::FlowError::Error); + } if let Some(passed_buffer) = buffer { if buffer_ptr != orig_buffer_ptr { @@ -165,6 +196,16 @@ impl PushSrcImplExt for T { } else { Ok(CreateSuccess::FilledBuffer) } + } else if let Some(buffer_list) = pending_buffer_list { + #[cfg(feature = "v1_14")] + { + Ok(CreateSuccess::NewBufferList(buffer_list)) + } + #[cfg(not(feature = "v1_14"))] + { + let _ = buffer_list; + unreachable!() + } } else { Ok(CreateSuccess::NewBuffer(from_glib_full(buffer_ptr))) } @@ -240,9 +281,16 @@ unsafe extern "C" fn push_src_create( Some(gst::BufferRef::from_mut_ptr(*buffer_ptr)) }; - gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, { + let instance_data = imp + .instance_data::(crate::BaseSrc::static_type()) + .unwrap(); + + let res = gst::panic_to_error!(&wrap, imp.panicked(), gst::FlowReturn::Error, { match PushSrcImpl::create(imp, wrap.unsafe_cast_ref(), buffer.as_deref_mut()) { Ok(CreateSuccess::NewBuffer(new_buffer)) => { + // Clear any pending buffer list + *instance_data.pending_buffer_list.borrow_mut() = None; + if let Some(passed_buffer) = buffer { if passed_buffer.as_ptr() != new_buffer.as_ptr() { gst_debug!( @@ -295,9 +343,32 @@ unsafe extern "C" fn push_src_create( gst::FlowReturn::Ok } } - Ok(CreateSuccess::FilledBuffer) => gst::FlowReturn::Ok, + #[cfg(any(feature = "v1_14", feature = "dox"))] + Ok(CreateSuccess::NewBufferList(new_buffer_list)) => { + if buffer.is_some() + || wrap.unsafe_cast_ref::().src_pad().mode() == gst::PadMode::Pull + { + panic!("Buffer lists can only be returned in push mode"); + } + + *buffer_ptr = ptr::null_mut(); + + // Store it in the instance data so that in the end base_src_create() can + // submit it. + *instance_data.pending_buffer_list.borrow_mut() = Some(new_buffer_list); + + gst::FlowReturn::Ok + } + Ok(CreateSuccess::FilledBuffer) => { + // Clear any pending buffer list + *instance_data.pending_buffer_list.borrow_mut() = None; + + gst::FlowReturn::Ok + } Err(err) => gst::FlowReturn::from(err), } }) - .into_glib() + .into_glib(); + + res }