From 932b8f813de42d767160417c3dba96cd2c4cbc04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Sun, 28 Feb 2021 18:50:44 +0200 Subject: [PATCH] gstreamer/pad: Add support for removing pad probes from the probe callback While this allows to remove the pad probe multiple times, which would cause a g_warning(), this is not actually making the situation worse than before while making some code patterns easier to implement: - Probes could already be removed twice by return gst::PadProbeReturn::Remove and then calling pad.remove_probe() - Probes could be removed from a different pad than where they were added As such let's go with the simple solution here for now and allow giving owned access to the probe id from the probe callback. Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/issues/286 --- gstreamer/src/pad.rs | 42 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/gstreamer/src/pad.rs b/gstreamer/src/pad.rs index 1ceec71ee..36ee3a0b3 100644 --- a/gstreamer/src/pad.rs +++ b/gstreamer/src/pad.rs @@ -57,7 +57,7 @@ impl FromGlib for PadProbeId { #[derive(Debug)] pub struct PadProbeInfo<'a> { pub mask: PadProbeType, - pub id: PadProbeId, + pub id: Option, pub offset: u64, pub size: u32, pub data: Option>, @@ -1081,7 +1081,7 @@ unsafe fn create_probe_info<'a>( let flow_ret: FlowReturn = from_glib((*info).ABI.abi.flow_ret); let info = PadProbeInfo { mask: from_glib((*info).type_), - id: PadProbeId(NonZeroU64::new_unchecked((*info).id as u64)), + id: Some(PadProbeId(NonZeroU64::new_unchecked((*info).id as u64))), offset: (*info).offset, size: (*info).size, data: if (*info).data.is_null() { @@ -1866,7 +1866,7 @@ impl + IsA> PadBuilder { mod tests { use super::*; use crate::prelude::*; - use std::sync::mpsc::channel; + use std::sync::{atomic::AtomicUsize, mpsc::channel}; use std::sync::{Arc, Mutex}; #[test] @@ -1996,6 +1996,42 @@ mod tests { assert_eq!(receiver.recv().unwrap(), 3); } + #[test] + fn test_remove_probe_from_probe() { + crate::init().unwrap(); + + let src_pad = crate::Pad::new(Some("src"), crate::PadDirection::Src); + let sink_pad = crate::Pad::builder(Some("sink"), crate::PadDirection::Sink) + .chain_function(|_pad, _parent, _buffer| Ok(crate::FlowSuccess::Ok)) + .build(); + + src_pad.link(&sink_pad).unwrap(); + + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + src_pad.add_probe(crate::PadProbeType::BUFFER, move |pad, info| { + if let Some(PadProbeData::Buffer(_)) = info.data { + counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + pad.remove_probe(info.id.take().expect("no pad probe id")); + } else { + unreachable!(); + } + crate::PadProbeReturn::Handled + }); + + sink_pad.set_active(true).unwrap(); + src_pad.set_active(true).unwrap(); + + assert!(src_pad.push_event(crate::event::StreamStart::new("test"))); + let segment = crate::FormattedSegment::::new(); + assert!(src_pad.push_event(crate::event::Segment::new(segment.as_ref()))); + + assert_eq!(src_pad.push(crate::Buffer::new()), Ok(FlowSuccess::Ok)); + assert_eq!(src_pad.push(crate::Buffer::new()), Ok(FlowSuccess::Ok)); + + assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1); + } + #[test] fn test_probe() { crate::init().unwrap();