diff --git a/mux/fmp4/examples/hls_live.rs b/mux/fmp4/examples/hls_live.rs index 1843c612..4f58a6a8 100644 --- a/mux/fmp4/examples/hls_live.rs +++ b/mux/fmp4/examples/hls_live.rs @@ -365,19 +365,21 @@ fn setup_appsink(appsink: &gst_app::AppSink, name: &str, path: &Path, is_video: fn probe_encoder(state: Arc>, enc: gst::Element) { enc.static_pad("src").unwrap().add_probe( gst::PadProbeType::EVENT_DOWNSTREAM, - move |_pad, info| match info.data { - Some(gst::PadProbeData::Event(ref ev)) => match ev.view() { - gst::EventView::Caps(e) => { - let mime = gst_pbutils::codec_utils_caps_get_mime_codec(e.caps()); + move |_pad, info| { + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Ok; + }; + let gst::EventView::Caps(ev) = ev.view() else { + return gst::PadProbeReturn::Ok; + }; - let mut state = state.lock().unwrap(); - state.all_mimes.push(mime.unwrap().into()); - state.maybe_write_manifest(); - gst::PadProbeReturn::Remove - } - _ => gst::PadProbeReturn::Ok, - }, - _ => gst::PadProbeReturn::Ok, + let mime = gst_pbutils::codec_utils_caps_get_mime_codec(ev.caps()); + + let mut state = state.lock().unwrap(); + state.all_mimes.push(mime.unwrap().into()); + state.maybe_write_manifest(); + + gst::PadProbeReturn::Remove }, ); } diff --git a/mux/fmp4/examples/hls_vod.rs b/mux/fmp4/examples/hls_vod.rs index a31f17db..c84102f4 100644 --- a/mux/fmp4/examples/hls_vod.rs +++ b/mux/fmp4/examples/hls_vod.rs @@ -260,19 +260,21 @@ fn setup_appsink(appsink: &gst_app::AppSink, name: &str, path: &Path, is_video: fn probe_encoder(state: Arc>, enc: gst::Element) { enc.static_pad("src").unwrap().add_probe( gst::PadProbeType::EVENT_DOWNSTREAM, - move |_pad, info| match info.data { - Some(gst::PadProbeData::Event(ref ev)) => match ev.view() { - gst::EventView::Caps(e) => { - let mime = gst_pbutils::codec_utils_caps_get_mime_codec(e.caps()); + move |_pad, info| { + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Ok; + }; + let gst::EventView::Caps(ev) = ev.view() else { + return gst::PadProbeReturn::Ok; + }; - let mut state = state.lock().unwrap(); - state.all_mimes.push(mime.unwrap().into()); - state.maybe_write_manifest(); - gst::PadProbeReturn::Remove - } - _ => gst::PadProbeReturn::Ok, - }, - _ => gst::PadProbeReturn::Ok, + let mime = gst_pbutils::codec_utils_caps_get_mime_codec(ev.caps()); + + let mut state = state.lock().unwrap(); + state.all_mimes.push(mime.unwrap().into()); + state.maybe_write_manifest(); + + gst::PadProbeReturn::Remove }, ); } diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index a9d6d38c..465ca34c 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -358,15 +358,20 @@ impl WebRTCSrc { .build(); if self.settings.lock().unwrap().enable_data_channel_navigation { - pad.add_probe(gst::PadProbeType::EVENT_UPSTREAM, + pad.add_probe( + gst::PadProbeType::EVENT_UPSTREAM, glib::clone!(@weak self as this => @default-panic, move |_pad, info| { - if let Some(gst::PadProbeData::Event(ref ev)) = info.data { - if let gst::EventView::Navigation(ev) = ev.view() { - this.send_navigation_event (gst_video::NavigationEvent::parse(ev).unwrap()); - } - } + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Ok; + }; + if ev.type_() != gst::EventType::Navigation { + return gst::PadProbeReturn::Ok; + }; + + this.send_navigation_event (gst_video::NavigationEvent::parse(ev).unwrap()); + gst::PadProbeReturn::Ok - }) + }), ); } diff --git a/utils/fallbackswitch/src/fallbacksrc/imp.rs b/utils/fallbackswitch/src/fallbacksrc/imp.rs index 745ad849..2001758f 100644 --- a/utils/fallbackswitch/src/fallbacksrc/imp.rs +++ b/utils/fallbackswitch/src/fallbacksrc/imp.rs @@ -1911,77 +1911,79 @@ impl FallbackSrc { let imp = element.imp(); - match info.data { - Some(gst::PadProbeData::Event(ref ev)) if ev.type_() == gst::EventType::Eos => { - gst::debug!( - CAT, - obj: element, - "Received EOS from {}source on pad {}", - if fallback_source { "fallback " } else { "" }, - pad.name() - ); + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Ok; + }; - let mut state_guard = imp.state.lock(); - let state = match &mut *state_guard { - None => { - return gst::PadProbeReturn::Ok; - } - Some(state) => state, - }; + if ev.type_() != gst::EventType::Eos { + return gst::PadProbeReturn::Ok; + } - if is_image { - gst::PadProbeReturn::Ok - } else if state.settings.restart_on_eos || fallback_source { - imp.handle_source_error(state, RetryReason::Eos, fallback_source); - drop(state_guard); - element.notify("statistics"); + gst::debug!( + CAT, + obj: element, + "Received EOS from {}source on pad {}", + if fallback_source { "fallback " } else { "" }, + pad.name() + ); - gst::PadProbeReturn::Drop + let mut state_guard = imp.state.lock(); + let state = match &mut *state_guard { + None => { + return gst::PadProbeReturn::Ok; + } + Some(state) => state, + }; + + if is_image { + gst::PadProbeReturn::Ok + } else if state.settings.restart_on_eos || fallback_source { + imp.handle_source_error(state, RetryReason::Eos, fallback_source); + drop(state_guard); + element.notify("statistics"); + + gst::PadProbeReturn::Drop + } else { + // Send EOS to all sinkpads of the fallbackswitch and also to the other + // stream's fallbackswitch if it doesn't have a main branch. + let mut sinkpads = vec![]; + + if let Some(stream) = { + if is_video { + state.video_stream.as_ref() } else { - // Send EOS to all sinkpads of the fallbackswitch and also to the other - // stream's fallbackswitch if it doesn't have a main branch. - let mut sinkpads = vec![]; + state.audio_stream.as_ref() + } + } { + sinkpads.extend(stream.switch.sink_pads().into_iter().filter(|p| p != pad)); + } - if let Some(stream) = { - if is_video { - state.video_stream.as_ref() - } else { - state.audio_stream.as_ref() - } - } { - sinkpads - .extend(stream.switch.sink_pads().into_iter().filter(|p| p != pad)); - } - - if let Some(other_stream) = { - if is_video { - state.audio_stream.as_ref() - } else { - state.video_stream.as_ref() - } - } { - if other_stream.main_branch.is_none() { - sinkpads.extend( - other_stream - .switch - .sink_pads() - .into_iter() - .filter(|p| p != pad), - ); - } - } - - let event = ev.clone(); - element.call_async(move |_| { - for sinkpad in sinkpads { - sinkpad.send_event(event.clone()); - } - }); - - gst::PadProbeReturn::Ok + if let Some(other_stream) = { + if is_video { + state.audio_stream.as_ref() + } else { + state.video_stream.as_ref() + } + } { + if other_stream.main_branch.is_none() { + sinkpads.extend( + other_stream + .switch + .sink_pads() + .into_iter() + .filter(|p| p != pad), + ); } } - _ => gst::PadProbeReturn::Ok, + + let event = ev.clone(); + element.call_async(move |_| { + for sinkpad in sinkpads { + sinkpad.send_event(event.clone()); + } + }); + + gst::PadProbeReturn::Ok } }); @@ -2056,13 +2058,15 @@ impl FallbackSrc { let qos_probe_id = block_pad .add_probe(gst::PadProbeType::EVENT_UPSTREAM, |_pad, info| { - if let Some(gst::PadProbeData::Event(ref ev)) = info.data { - if let gst::EventView::Qos(_) = ev.view() { - return gst::PadProbeReturn::Drop; - } + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Ok; + }; + + if ev.type_() != gst::EventType::Qos { + return gst::PadProbeReturn::Ok; } - gst::PadProbeReturn::Ok + gst::PadProbeReturn::Drop }) .unwrap(); @@ -2871,19 +2875,17 @@ impl FallbackSrc { // error. We don't need to remove these pad probes because restarting the source will also // remove/add the pads again. for pad in source.source.src_pads() { - pad.add_probe( - gst::PadProbeType::EVENT_DOWNSTREAM, - |_pad, info| match info.data { - Some(gst::PadProbeData::Event(ref event)) => { - if event.type_() == gst::EventType::Eos { - gst::PadProbeReturn::Drop - } else { - gst::PadProbeReturn::Ok - } - } - _ => unreachable!(), - }, - ) + pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |_pad, info| { + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Pass; + }; + + if ev.type_() != gst::EventType::Eos { + return gst::PadProbeReturn::Pass; + } + + gst::PadProbeReturn::Drop + }) .unwrap(); } diff --git a/utils/togglerecord/tests/tests.rs b/utils/togglerecord/tests/tests.rs index e59fcdfc..e3614fc1 100644 --- a/utils/togglerecord/tests/tests.rs +++ b/utils/togglerecord/tests/tests.rs @@ -66,14 +66,12 @@ fn setup_sender_receiver( sinkpad.add_probe( gst::PadProbeType::QUERY_UPSTREAM, move |_pad, probe_info| { - let query = match &mut probe_info.data { - Some(gst::PadProbeData::Query(q)) => q, - _ => unreachable!(), + let Some(query) = probe_info.query_mut() else { + unreachable!(); }; - use gst::QueryViewMut::*; match query.view_mut() { - Latency(q) => { + gst::QueryViewMut::Latency(q) => { q.set(live, gst::ClockTime::ZERO, None); gst::PadProbeReturn::Handled } diff --git a/utils/uriplaylistbin/src/uriplaylistbin/imp.rs b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs index d213b05c..c8f32f27 100644 --- a/utils/uriplaylistbin/src/uriplaylistbin/imp.rs +++ b/utils/uriplaylistbin/src/uriplaylistbin/imp.rs @@ -1286,35 +1286,36 @@ impl UriPlaylistBin { let src_pad_name = sync_sink.name().to_string().replace("sink", "src"); let sync_src = state.streamsynchronizer.static_pad(&src_pad_name).unwrap(); sync_src.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |_pad, info| { - match info.data { - Some(gst::PadProbeData::Event(ref ev)) - if ev.type_() == gst::EventType::Eos => - { - let element = match element_weak.upgrade() { - Some(element) => element, - None => return gst::PadProbeReturn::Remove, - }; - let imp = element.imp(); + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Pass; + }; - let item = { - let mut state_guard = imp.state.lock().unwrap(); - let state = state_guard.as_mut().unwrap(); - state.waiting_for_ss_eos.as_ref().cloned() - }; + if ev.type_() != gst::EventType::Eos { + return gst::PadProbeReturn::Pass; + } - if let Some(item) = item { - if item.dec_waiting_eos_ss() { - gst::debug!(CAT, imp: imp, "streamsynchronizer has been flushed, reorganize pipeline to fit new streams topology and unblock item"); - imp.handle_topology_change(); - gst::PadProbeReturn::Drop - } else { - gst::PadProbeReturn::Drop - } - } else { - gst::PadProbeReturn::Pass - } + let element = match element_weak.upgrade() { + Some(element) => element, + None => return gst::PadProbeReturn::Remove, + }; + let imp = element.imp(); + + let item = { + let mut state_guard = imp.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + state.waiting_for_ss_eos.as_ref().cloned() + }; + + if let Some(item) = item { + if item.dec_waiting_eos_ss() { + gst::debug!(CAT, imp: imp, "streamsynchronizer has been flushed, reorganize pipeline to fit new streams topology and unblock item"); + imp.handle_topology_change(); + gst::PadProbeReturn::Drop + } else { + gst::PadProbeReturn::Drop } - _ => gst::PadProbeReturn::Pass, + } else { + gst::PadProbeReturn::Pass } }); @@ -1434,47 +1435,48 @@ impl UriPlaylistBin { gst::PadProbeReturn::Pass } else { - match info.data { - Some(gst::PadProbeData::Event(ref ev)) - if ev.type_() == gst::EventType::Eos => - { - if item.dec_waiting_eos() { - // all the streams are eos, item is now done - gst::log!( - CAT, - obj: element, - "all streams of item #{} are eos", - item.index() - ); + let Some(ev) = info.event() else { + return gst::PadProbeReturn::Pass; + }; - let imp = element.imp(); - { - let mut state_guard = imp.state.lock().unwrap(); - let state = state_guard.as_mut().unwrap(); - - let index = item.index(); - - let removed = state - .streaming - .iter() - .position(|i| i.index() == index) - .map(|e| state.streaming.remove(e)); - - if let Some(item) = removed { - item.set_done(); - state.done.push(item); - } - } - - if let Err(e) = imp.start_next_item() { - imp.failed(e); - } - } - - gst::PadProbeReturn::Remove - } - _ => gst::PadProbeReturn::Pass, + if ev.type_() != gst::EventType::Eos { + return gst::PadProbeReturn::Pass; } + + if item.dec_waiting_eos() { + // all the streams are eos, item is now done + gst::log!( + CAT, + obj: element, + "all streams of item #{} are eos", + item.index() + ); + + let imp = element.imp(); + { + let mut state_guard = imp.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + + let index = item.index(); + + let removed = state + .streaming + .iter() + .position(|i| i.index() == index) + .map(|e| state.streaming.remove(e)); + + if let Some(item) = removed { + item.set_done(); + state.done.push(item); + } + } + + if let Err(e) = imp.start_next_item() { + imp.failed(e); + } + } + + gst::PadProbeReturn::Remove } }); diff --git a/video/closedcaption/src/transcriberbin/imp.rs b/video/closedcaption/src/transcriberbin/imp.rs index 7c7a823d..bec46f68 100644 --- a/video/closedcaption/src/transcriberbin/imp.rs +++ b/video/closedcaption/src/transcriberbin/imp.rs @@ -300,7 +300,7 @@ impl TranscriberBin { return gst::PadProbeReturn::Pass; } - if let Some(gst::PadProbeData::Buffer(buffer)) = &mut probe_info.data { + if let Some(buffer) = probe_info.buffer_mut() { let buffer = buffer.make_mut(); while let Some(meta) = buffer.meta_mut::() { meta.remove().unwrap();