gstreamer/pad: Add support for removing pad probes from the probe callback

While this allows to remove the pad probe multiple times, which would
cause a g_warning(), this is not actually making the situation worse
than before while making some code patterns easier to implement:

  - Probes could already be removed twice by return
    gst::PadProbeReturn::Remove and then calling pad.remove_probe()
  - Probes could be removed from a different pad than where they were
    added

As such let's go with the simple solution here for now and allow giving
owned access to the probe id from the probe callback.

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/-/issues/286
This commit is contained in:
Sebastian Dröge 2021-02-28 18:50:44 +02:00
parent 8b71f5331a
commit 932b8f813d

View file

@ -57,7 +57,7 @@ impl FromGlib<libc::c_ulong> for PadProbeId {
#[derive(Debug)]
pub struct PadProbeInfo<'a> {
pub mask: PadProbeType,
pub id: PadProbeId,
pub id: Option<PadProbeId>,
pub offset: u64,
pub size: u32,
pub data: Option<PadProbeData<'a>>,
@ -1081,7 +1081,7 @@ unsafe fn create_probe_info<'a>(
let flow_ret: FlowReturn = from_glib((*info).ABI.abi.flow_ret);
let info = PadProbeInfo {
mask: from_glib((*info).type_),
id: PadProbeId(NonZeroU64::new_unchecked((*info).id as u64)),
id: Some(PadProbeId(NonZeroU64::new_unchecked((*info).id as u64))),
offset: (*info).offset,
size: (*info).size,
data: if (*info).data.is_null() {
@ -1866,7 +1866,7 @@ impl<T: IsA<Pad> + IsA<glib::Object>> PadBuilder<T> {
mod tests {
use super::*;
use crate::prelude::*;
use std::sync::mpsc::channel;
use std::sync::{atomic::AtomicUsize, mpsc::channel};
use std::sync::{Arc, Mutex};
#[test]
@ -1996,6 +1996,42 @@ mod tests {
assert_eq!(receiver.recv().unwrap(), 3);
}
#[test]
fn test_remove_probe_from_probe() {
crate::init().unwrap();
let src_pad = crate::Pad::new(Some("src"), crate::PadDirection::Src);
let sink_pad = crate::Pad::builder(Some("sink"), crate::PadDirection::Sink)
.chain_function(|_pad, _parent, _buffer| Ok(crate::FlowSuccess::Ok))
.build();
src_pad.link(&sink_pad).unwrap();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = counter.clone();
src_pad.add_probe(crate::PadProbeType::BUFFER, move |pad, info| {
if let Some(PadProbeData::Buffer(_)) = info.data {
counter_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
pad.remove_probe(info.id.take().expect("no pad probe id"));
} else {
unreachable!();
}
crate::PadProbeReturn::Handled
});
sink_pad.set_active(true).unwrap();
src_pad.set_active(true).unwrap();
assert!(src_pad.push_event(crate::event::StreamStart::new("test")));
let segment = crate::FormattedSegment::<crate::ClockTime>::new();
assert!(src_pad.push_event(crate::event::Segment::new(segment.as_ref())));
assert_eq!(src_pad.push(crate::Buffer::new()), Ok(FlowSuccess::Ok));
assert_eq!(src_pad.push(crate::Buffer::new()), Ok(FlowSuccess::Ok));
assert_eq!(counter.load(std::sync::atomic::Ordering::SeqCst), 1);
}
#[test]
fn test_probe() {
crate::init().unwrap();