diff --git a/gstreamer/src/pad.rs b/gstreamer/src/pad.rs index 1ceec71ee..36ee3a0b3 100644 --- a/gstreamer/src/pad.rs +++ b/gstreamer/src/pad.rs @@ -57,7 +57,7 @@ impl FromGlib for PadProbeId { #[derive(Debug)] pub struct PadProbeInfo<'a> { pub mask: PadProbeType, - pub id: PadProbeId, + pub id: Option, pub offset: u64, pub size: u32, pub data: Option>, @@ -1081,7 +1081,7 @@ unsafe fn create_probe_info<'a>( let flow_ret: FlowReturn = from_glib((*info).ABI.abi.flow_ret); let info = PadProbeInfo { mask: from_glib((*info).type_), - id: PadProbeId(NonZeroU64::new_unchecked((*info).id as u64)), + id: Some(PadProbeId(NonZeroU64::new_unchecked((*info).id as u64))), offset: (*info).offset, size: (*info).size, data: if (*info).data.is_null() { @@ -1866,7 +1866,7 @@ impl + IsA> PadBuilder { mod tests { use super::*; use crate::prelude::*; - use std::sync::mpsc::channel; + use std::sync::{atomic::AtomicUsize, mpsc::channel}; use std::sync::{Arc, Mutex}; #[test] @@ -1996,6 +1996,42 @@ mod tests { assert_eq!(receiver.recv().unwrap(), 3); } + #[test] + fn test_remove_probe_from_probe() { + crate::init().unwrap(); + + let src_pad = crate::Pad::new(Some("src"), crate::PadDirection::Src); + let sink_pad = crate::Pad::builder(Some("sink"), crate::PadDirection::Sink) + .chain_function(|_pad, _parent, _buffer| Ok(crate::FlowSuccess::Ok)) + .build(); + + src_pad.link(&sink_pad).unwrap(); + + let counter = Arc::new(AtomicUsize::new(0)); + let counter_clone = counter.clone(); + src_pad.add_probe(crate::PadProbeType::BUFFER, move |pad, info| { + if let Some(PadProbeData::Buffer(_)) = info.data { + counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst); + pad.remove_probe(info.id.take().expect("no pad probe id")); + } else { + unreachable!(); + } + crate::PadProbeReturn::Handled + }); + + sink_pad.set_active(true).unwrap(); + src_pad.set_active(true).unwrap(); + + assert!(src_pad.push_event(crate::event::StreamStart::new("test"))); + let segment = crate::FormattedSegment::::new(); + assert!(src_pad.push_event(crate::event::Segment::new(segment.as_ref()))); + + assert_eq!(src_pad.push(crate::Buffer::new()), Ok(FlowSuccess::Ok)); + assert_eq!(src_pad.push(crate::Buffer::new()), Ok(FlowSuccess::Ok)); + + assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1); + } + #[test] fn test_probe() { crate::init().unwrap();