diff --git a/examples/src/bin/custom_events.rs b/examples/src/bin/custom_events.rs index 07a7f5aed..591da1f08 100644 --- a/examples/src/bin/custom_events.rs +++ b/examples/src/bin/custom_events.rs @@ -75,31 +75,33 @@ fn example_main() { // Add a pad probe on the sink pad and catch the custom event we sent, then send // an EOS event on the pipeline. sinkpad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |_, probe_info| { - match probe_info.data { - Some(gst::PadProbeData::Event(ref ev)) - if ev.type_() == gst::EventType::CustomDownstream => - { - if let Some(custom_event) = ExampleCustomEvent::parse(ev) { - if let Some(pipeline) = pipeline_weak.upgrade() { - if custom_event.send_eos { - /* Send EOS event to shut down the pipeline, but from an async callback, as we're - * in a pad probe blocking the stream thread here... */ - println!("Got custom event with send_eos=true. Sending EOS"); - let ev = gst::event::Eos::new(); - let pipeline_weak = pipeline_weak.clone(); - pipeline.call_async(move |_| { - if let Some(pipeline) = pipeline_weak.upgrade() { - pipeline.send_event(ev); - } - }); - } else { - println!("Got custom event, with send_eos=false. Ignoring"); - } - } + let Some(event) = probe_info.event() else { + return gst::PadProbeReturn::Ok; + }; + + let Some(custom_event) = ExampleCustomEvent::parse(event) else { + return gst::PadProbeReturn::Ok; + }; + + let Some(pipeline) = pipeline_weak.upgrade() else { + return gst::PadProbeReturn::Ok; + }; + + if custom_event.send_eos { + /* Send EOS event to shut down the pipeline, but from an async callback, as we're + * in a pad probe blocking the stream thread here... */ + println!("Got custom event with send_eos=true. Sending EOS"); + let ev = gst::event::Eos::new(); + let pipeline_weak = pipeline_weak.clone(); + pipeline.call_async(move |_| { + if let Some(pipeline) = pipeline_weak.upgrade() { + pipeline.send_event(ev); } - } - _ => (), + }); + } else { + println!("Got custom event, with send_eos=false. Ignoring"); } + gst::PadProbeReturn::Ok }); diff --git a/examples/src/bin/d3d11videosink.rs b/examples/src/bin/d3d11videosink.rs index e889c80b2..36fe79e91 100644 --- a/examples/src/bin/d3d11videosink.rs +++ b/examples/src/bin/d3d11videosink.rs @@ -305,19 +305,18 @@ fn main() -> Result<()> { let sinkpad = videosink.static_pad("sink").unwrap(); let overlay_context_weak = Arc::downgrade(&overlay_context); sinkpad.add_probe(gst::PadProbeType::BUFFER, move |_, probe_info| { - if let Some(gst::PadProbeData::Buffer(_)) = probe_info.data { - let overlay_context = overlay_context_weak.upgrade().unwrap(); - let mut context = overlay_context.lock().unwrap(); - context.timestamp_queue.push_back(SystemTime::now()); - // Updates framerate per 10 frames - if context.timestamp_queue.len() >= 10 { - let now = context.timestamp_queue.back().unwrap(); - let front = context.timestamp_queue.front().unwrap(); - let duration = now.duration_since(*front).unwrap().as_millis() as f32; - context.avg_fps = 1000f32 * (context.timestamp_queue.len() - 1) as f32 / duration; - context.timestamp_queue.clear(); - } + let overlay_context = overlay_context_weak.upgrade().unwrap(); + let mut context = overlay_context.lock().unwrap(); + context.timestamp_queue.push_back(SystemTime::now()); + // Updates framerate per 10 frames + if context.timestamp_queue.len() >= 10 { + let now = context.timestamp_queue.back().unwrap(); + let front = context.timestamp_queue.front().unwrap(); + let duration = now.duration_since(*front).unwrap().as_millis() as f32; + context.avg_fps = 1000f32 * (context.timestamp_queue.len() - 1) as f32 / duration; + context.timestamp_queue.clear(); } + gst::PadProbeReturn::Ok }); diff --git a/examples/src/bin/pad_probes.rs b/examples/src/bin/pad_probes.rs index 8dbaa58b7..624a232a3 100644 --- a/examples/src/bin/pad_probes.rs +++ b/examples/src/bin/pad_probes.rs @@ -38,36 +38,38 @@ fn example_main() { // This handler gets called for every buffer that passes the pad we probe. src_pad.add_probe(gst::PadProbeType::BUFFER, |_, probe_info| { // Interpret the data sent over the pad as one buffer - if let Some(gst::PadProbeData::Buffer(ref buffer)) = probe_info.data { - // At this point, buffer is only a reference to an existing memory region somewhere. - // When we want to access its content, we have to map it while requesting the required - // mode of access (read, read/write). - // This type of abstraction is necessary, because the buffer in question might not be - // on the machine's main memory itself, but rather in the GPU's memory. - // So mapping the buffer makes the underlying memory region accessible to us. - // See: https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/allocation.html - let map = buffer.map_readable().unwrap(); + let Some(buffer) = probe_info.buffer() else { + return gst::PadProbeReturn::Ok; + }; - // We know what format the data in the memory region has, since we requested - // it by setting the appsink's caps. So what we do here is interpret the - // memory region we mapped as an array of signed 16 bit integers. - let samples = if let Ok(samples) = map.as_slice_of::() { - samples - } else { - return gst::PadProbeReturn::Ok; - }; + // At this point, buffer is only a reference to an existing memory region somewhere. + // When we want to access its content, we have to map it while requesting the required + // mode of access (read, read/write). + // This type of abstraction is necessary, because the buffer in question might not be + // on the machine's main memory itself, but rather in the GPU's memory. + // So mapping the buffer makes the underlying memory region accessible to us. + // See: https://gstreamer.freedesktop.org/documentation/plugin-development/advanced/allocation.html + let map = buffer.map_readable().unwrap(); - // For buffer (= chunk of samples), we calculate the root mean square: - let sum: f64 = samples - .iter() - .map(|sample| { - let f = f64::from(*sample) / f64::from(i16::MAX); - f * f - }) - .sum(); - let rms = (sum / (samples.len() as f64)).sqrt(); - println!("rms: {rms}"); - } + // We know what format the data in the memory region has, since we requested + // it by setting the appsink's caps. So what we do here is interpret the + // memory region we mapped as an array of signed 16 bit integers. + let samples = if let Ok(samples) = map.as_slice_of::() { + samples + } else { + return gst::PadProbeReturn::Ok; + }; + + // For buffer (= chunk of samples), we calculate the root mean square: + let sum: f64 = samples + .iter() + .map(|sample| { + let f = f64::from(*sample) / f64::from(i16::MAX); + f * f + }) + .sum(); + let rms = (sum / (samples.len() as f64)).sqrt(); + println!("rms: {rms}"); gst::PadProbeReturn::Ok }); diff --git a/examples/src/bin/zoom.rs b/examples/src/bin/zoom.rs index 40b0fb99f..59b4802f9 100644 --- a/examples/src/bin/zoom.rs +++ b/examples/src/bin/zoom.rs @@ -96,7 +96,7 @@ fn example_main() { mixer_src_pad.add_probe(gst::PadProbeType::EVENT_UPSTREAM, move |_, probe_info| { let mixer_sink_pad = mixer_sink_pad_weak.upgrade().unwrap(); - let Some(gst::PadProbeData::Event(ref ev)) = probe_info.data else { + let Some(ev) = probe_info.event() else { return gst::PadProbeReturn::Ok; }; diff --git a/gstreamer-utils/src/streamproducer.rs b/gstreamer-utils/src/streamproducer.rs index ceba37125..a741a046d 100644 --- a/gstreamer-utils/src/streamproducer.rs +++ b/gstreamer-utils/src/streamproducer.rs @@ -196,11 +196,13 @@ impl StreamProducer { .add_probe( gst::PadProbeType::EVENT_UPSTREAM, glib::clone!(@weak appsink, @weak consumer => @default-panic, move |_pad, info| { - if let Some(gst::PadProbeData::Event(ref ev)) = info.data { - if gst_video::UpstreamForceKeyUnitEvent::parse(ev).is_ok() { - gst::debug!(CAT, obj: &appsink, "Requesting keyframe"); - let _ = appsink.send_event(ev.clone()); - } + let Some(event) = info.event() else { + return gst::PadProbeReturn::Ok; + }; + + if gst_video::UpstreamForceKeyUnitEvent::parse(event).is_ok() { + gst::debug!(CAT, obj: &appsink, "Requesting keyframe"); + let _ = appsink.send_event(event.clone()); } gst::PadProbeReturn::Ok @@ -486,14 +488,18 @@ impl<'a> From<&'a gst_app::AppSink> for StreamProducer { sinkpad.add_probe( gst::PadProbeType::EVENT_UPSTREAM, glib::clone!(@strong consumers => move |_pad, info| { - if let Some(gst::PadProbeData::Event(ref ev)) = info.data { - if let gst::EventView::Latency(ev) = ev.view() { - let latency = ev.latency(); - let mut consumers = consumers.lock().unwrap(); - consumers.current_latency = Some(latency); - consumers.latency_updated = true; - } - } + let Some(event) = info.event() else { + return gst::PadProbeReturn::Ok; + }; + + let gst::EventView::Latency(event) = event.view() else { + return gst::PadProbeReturn::Ok; + }; + + let latency = event.latency(); + let mut consumers = consumers.lock().unwrap(); + consumers.current_latency = Some(latency); + gst::PadProbeReturn::Ok }), ); diff --git a/gstreamer/src/pad.rs b/gstreamer/src/pad.rs index 8028bf356..c9a28ddb6 100644 --- a/gstreamer/src/pad.rs +++ b/gstreamer/src/pad.rs @@ -56,6 +56,64 @@ pub struct PadProbeInfo<'a> { pub flow_res: Result, } +impl<'a> PadProbeInfo<'a> { + pub fn buffer(&self) -> Option<&Buffer> { + match self.data { + Some(PadProbeData::Buffer(ref buffer)) => Some(buffer), + _ => None, + } + } + + pub fn buffer_mut(&mut self) -> Option<&mut Buffer> { + match self.data { + Some(PadProbeData::Buffer(ref mut buffer)) => Some(buffer), + _ => None, + } + } + + pub fn buffer_list(&self) -> Option<&BufferList> { + match self.data { + Some(PadProbeData::BufferList(ref buffer_list)) => Some(buffer_list), + _ => None, + } + } + + pub fn buffer_list_mut(&mut self) -> Option<&mut BufferList> { + match self.data { + Some(PadProbeData::BufferList(ref mut buffer_list)) => Some(buffer_list), + _ => None, + } + } + + pub fn query(&self) -> Option<&QueryRef> { + match self.data { + Some(PadProbeData::Query(ref query)) => Some(*query), + _ => None, + } + } + + pub fn query_mut(&mut self) -> Option<&mut QueryRef> { + match self.data { + Some(PadProbeData::Query(ref mut query)) => Some(*query), + _ => None, + } + } + + pub fn event(&self) -> Option<&Event> { + match self.data { + Some(PadProbeData::Event(ref event)) => Some(event), + _ => None, + } + } + + pub fn event_mut(&mut self) -> Option<&mut Event> { + match self.data { + Some(PadProbeData::Event(ref mut event)) => Some(event), + _ => None, + } + } +} + #[derive(Debug)] pub enum PadProbeData<'a> { Buffer(Buffer),