basetransform: Fix memory leak when dropping buffers from the transform function

Also add a basic test for a basetransform subclass.

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/issues/472

Part-of: <https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/merge_requests/1290>
This commit is contained in:
Sebastian Dröge 2023-06-30 10:24:55 +03:00
parent cbe627fe8b
commit 38420c7aab
2 changed files with 140 additions and 10 deletions

View file

@ -21,6 +21,7 @@ ffi = { package = "gstreamer-base-sys", path = "sys", version = "0.20" }
glib = { git = "https://github.com/gtk-rs/gtk-rs-core", branch = "0.17", version = "0.17" }
gst = { package = "gstreamer", path = "../gstreamer", version = "0.20" }
atomic_refcell = "0.1"
once_cell = "1"
[dev-dependencies]
gir-format-check = "0.1"

View file

@ -828,21 +828,20 @@ impl<T: BaseTransformImpl> BaseTransformImplExt for T {
.expect("Missing parent function `generate_output`");
let mut outbuf = ptr::null_mut();
gst::FlowSuccess::try_from_glib(f(
let res = gst::FlowSuccess::try_from_glib(f(
self.obj()
.unsafe_cast_ref::<BaseTransform>()
.to_glib_none()
.0,
&mut outbuf,
))
.map(|res| {
if res == crate::BASE_TRANSFORM_FLOW_DROPPED {
GenerateOutputSuccess::Dropped
} else if res != gst::FlowSuccess::Ok || outbuf.is_null() {
GenerateOutputSuccess::NoOutput
} else {
GenerateOutputSuccess::Buffer(from_glib_full(outbuf))
}
));
let outbuf = Option::<gst::Buffer>::from_glib_full(outbuf);
res.map(move |res| match (res, outbuf) {
(crate::BASE_TRANSFORM_FLOW_DROPPED, _) => GenerateOutputSuccess::Dropped,
(gst::FlowSuccess::Ok, Some(outbuf)) => GenerateOutputSuccess::Buffer(outbuf),
_ => GenerateOutputSuccess::NoOutput,
})
}
}
@ -1378,3 +1377,133 @@ unsafe extern "C" fn base_transform_generate_output<T: BaseTransformImpl>(
})
.into_glib()
}
#[cfg(test)]
mod tests {
use super::*;
pub mod imp {
use super::*;
use std::sync::atomic::{self, AtomicBool};
#[derive(Default)]
pub struct TestTransform {
drop_next: AtomicBool,
}
#[glib::object_subclass]
impl ObjectSubclass for TestTransform {
const NAME: &'static str = "TestTransform";
type Type = super::TestTransform;
type ParentType = crate::BaseTransform;
}
impl ObjectImpl for TestTransform {}
impl GstObjectImpl for TestTransform {}
impl ElementImpl for TestTransform {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
use once_cell::sync::Lazy;
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"Test Transform",
"Generic",
"Does nothing",
"Sebastian Dröge <sebastian@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
use once_cell::sync::Lazy;
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::new_any();
vec![
gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.unwrap(),
gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
)
.unwrap(),
]
});
PAD_TEMPLATES.as_ref()
}
}
impl BaseTransformImpl for TestTransform {
const MODE: BaseTransformMode = BaseTransformMode::AlwaysInPlace;
const PASSTHROUGH_ON_SAME_CAPS: bool = false;
const TRANSFORM_IP_ON_PASSTHROUGH: bool = false;
fn transform_ip(
&self,
_buf: &mut gst::BufferRef,
) -> Result<gst::FlowSuccess, gst::FlowError> {
if self.drop_next.load(atomic::Ordering::SeqCst) {
self.drop_next.store(false, atomic::Ordering::SeqCst);
Ok(crate::BASE_TRANSFORM_FLOW_DROPPED)
} else {
self.drop_next.store(true, atomic::Ordering::SeqCst);
Ok(gst::FlowSuccess::Ok)
}
}
}
}
glib::wrapper! {
pub struct TestTransform(ObjectSubclass<imp::TestTransform>) @extends crate::BaseTransform, gst::Element, gst::Object;
}
impl TestTransform {
pub fn new(name: Option<&str>) -> Self {
glib::Object::builder().property("name", name).build()
}
}
#[test]
fn test_transform_subclass() {
gst::init().unwrap();
let element = TestTransform::new(Some("test"));
assert_eq!(element.name(), "test");
let pipeline = gst::Pipeline::default();
let src = gst::ElementFactory::make("audiotestsrc")
.property("num-buffers", 100i32)
.build()
.unwrap();
let sink = gst::ElementFactory::make("fakesink").build().unwrap();
pipeline
.add_many(&[&src, element.upcast_ref(), &sink])
.unwrap();
gst::Element::link_many(&[&src, element.upcast_ref(), &sink]).unwrap();
pipeline.set_state(gst::State::Playing).unwrap();
let bus = pipeline.bus().unwrap();
let eos = bus.timed_pop_filtered(gst::ClockTime::NONE, &[gst::MessageType::Eos]);
assert!(eos.is_some());
let stats = sink.property::<gst::Structure>("stats");
assert_eq!(stats.get::<u64>("rendered").unwrap(), 50);
pipeline.set_state(gst::State::Null).unwrap();
}
}