Clean up usage of pad probes

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1361>
This commit is contained in:
Sebastian Dröge 2023-10-16 19:16:52 +03:00
parent 50dd519c4f
commit d468e1e4a6
7 changed files with 195 additions and 184 deletions

View file

@ -365,19 +365,21 @@ fn setup_appsink(appsink: &gst_app::AppSink, name: &str, path: &Path, is_video:
fn probe_encoder(state: Arc<Mutex<State>>, enc: gst::Element) { fn probe_encoder(state: Arc<Mutex<State>>, enc: gst::Element) {
enc.static_pad("src").unwrap().add_probe( enc.static_pad("src").unwrap().add_probe(
gst::PadProbeType::EVENT_DOWNSTREAM, gst::PadProbeType::EVENT_DOWNSTREAM,
move |_pad, info| match info.data { move |_pad, info| {
Some(gst::PadProbeData::Event(ref ev)) => match ev.view() { let Some(ev) = info.event() else {
gst::EventView::Caps(e) => { return gst::PadProbeReturn::Ok;
let mime = gst_pbutils::codec_utils_caps_get_mime_codec(e.caps()); };
let gst::EventView::Caps(ev) = ev.view() else {
return gst::PadProbeReturn::Ok;
};
let mut state = state.lock().unwrap(); let mime = gst_pbutils::codec_utils_caps_get_mime_codec(ev.caps());
state.all_mimes.push(mime.unwrap().into());
state.maybe_write_manifest(); let mut state = state.lock().unwrap();
gst::PadProbeReturn::Remove state.all_mimes.push(mime.unwrap().into());
} state.maybe_write_manifest();
_ => gst::PadProbeReturn::Ok,
}, gst::PadProbeReturn::Remove
_ => gst::PadProbeReturn::Ok,
}, },
); );
} }

View file

@ -260,19 +260,21 @@ fn setup_appsink(appsink: &gst_app::AppSink, name: &str, path: &Path, is_video:
fn probe_encoder(state: Arc<Mutex<State>>, enc: gst::Element) { fn probe_encoder(state: Arc<Mutex<State>>, enc: gst::Element) {
enc.static_pad("src").unwrap().add_probe( enc.static_pad("src").unwrap().add_probe(
gst::PadProbeType::EVENT_DOWNSTREAM, gst::PadProbeType::EVENT_DOWNSTREAM,
move |_pad, info| match info.data { move |_pad, info| {
Some(gst::PadProbeData::Event(ref ev)) => match ev.view() { let Some(ev) = info.event() else {
gst::EventView::Caps(e) => { return gst::PadProbeReturn::Ok;
let mime = gst_pbutils::codec_utils_caps_get_mime_codec(e.caps()); };
let gst::EventView::Caps(ev) = ev.view() else {
return gst::PadProbeReturn::Ok;
};
let mut state = state.lock().unwrap(); let mime = gst_pbutils::codec_utils_caps_get_mime_codec(ev.caps());
state.all_mimes.push(mime.unwrap().into());
state.maybe_write_manifest(); let mut state = state.lock().unwrap();
gst::PadProbeReturn::Remove state.all_mimes.push(mime.unwrap().into());
} state.maybe_write_manifest();
_ => gst::PadProbeReturn::Ok,
}, gst::PadProbeReturn::Remove
_ => gst::PadProbeReturn::Ok,
}, },
); );
} }

View file

@ -358,15 +358,20 @@ impl WebRTCSrc {
.build(); .build();
if self.settings.lock().unwrap().enable_data_channel_navigation { if self.settings.lock().unwrap().enable_data_channel_navigation {
pad.add_probe(gst::PadProbeType::EVENT_UPSTREAM, pad.add_probe(
gst::PadProbeType::EVENT_UPSTREAM,
glib::clone!(@weak self as this => @default-panic, move |_pad, info| { glib::clone!(@weak self as this => @default-panic, move |_pad, info| {
if let Some(gst::PadProbeData::Event(ref ev)) = info.data { let Some(ev) = info.event() else {
if let gst::EventView::Navigation(ev) = ev.view() { return gst::PadProbeReturn::Ok;
this.send_navigation_event (gst_video::NavigationEvent::parse(ev).unwrap()); };
} if ev.type_() != gst::EventType::Navigation {
} return gst::PadProbeReturn::Ok;
};
this.send_navigation_event (gst_video::NavigationEvent::parse(ev).unwrap());
gst::PadProbeReturn::Ok gst::PadProbeReturn::Ok
}) }),
); );
} }

View file

@ -1911,77 +1911,79 @@ impl FallbackSrc {
let imp = element.imp(); let imp = element.imp();
match info.data { let Some(ev) = info.event() else {
Some(gst::PadProbeData::Event(ref ev)) if ev.type_() == gst::EventType::Eos => { return gst::PadProbeReturn::Ok;
gst::debug!( };
CAT,
obj: element,
"Received EOS from {}source on pad {}",
if fallback_source { "fallback " } else { "" },
pad.name()
);
let mut state_guard = imp.state.lock(); if ev.type_() != gst::EventType::Eos {
let state = match &mut *state_guard { return gst::PadProbeReturn::Ok;
None => { }
return gst::PadProbeReturn::Ok;
}
Some(state) => state,
};
if is_image { gst::debug!(
gst::PadProbeReturn::Ok CAT,
} else if state.settings.restart_on_eos || fallback_source { obj: element,
imp.handle_source_error(state, RetryReason::Eos, fallback_source); "Received EOS from {}source on pad {}",
drop(state_guard); if fallback_source { "fallback " } else { "" },
element.notify("statistics"); pad.name()
);
gst::PadProbeReturn::Drop let mut state_guard = imp.state.lock();
let state = match &mut *state_guard {
None => {
return gst::PadProbeReturn::Ok;
}
Some(state) => state,
};
if is_image {
gst::PadProbeReturn::Ok
} else if state.settings.restart_on_eos || fallback_source {
imp.handle_source_error(state, RetryReason::Eos, fallback_source);
drop(state_guard);
element.notify("statistics");
gst::PadProbeReturn::Drop
} else {
// Send EOS to all sinkpads of the fallbackswitch and also to the other
// stream's fallbackswitch if it doesn't have a main branch.
let mut sinkpads = vec![];
if let Some(stream) = {
if is_video {
state.video_stream.as_ref()
} else { } else {
// Send EOS to all sinkpads of the fallbackswitch and also to the other state.audio_stream.as_ref()
// stream's fallbackswitch if it doesn't have a main branch. }
let mut sinkpads = vec![]; } {
sinkpads.extend(stream.switch.sink_pads().into_iter().filter(|p| p != pad));
}
if let Some(stream) = { if let Some(other_stream) = {
if is_video { if is_video {
state.video_stream.as_ref() state.audio_stream.as_ref()
} else { } else {
state.audio_stream.as_ref() state.video_stream.as_ref()
} }
} { } {
sinkpads if other_stream.main_branch.is_none() {
.extend(stream.switch.sink_pads().into_iter().filter(|p| p != pad)); sinkpads.extend(
} other_stream
.switch
if let Some(other_stream) = { .sink_pads()
if is_video { .into_iter()
state.audio_stream.as_ref() .filter(|p| p != pad),
} else { );
state.video_stream.as_ref()
}
} {
if other_stream.main_branch.is_none() {
sinkpads.extend(
other_stream
.switch
.sink_pads()
.into_iter()
.filter(|p| p != pad),
);
}
}
let event = ev.clone();
element.call_async(move |_| {
for sinkpad in sinkpads {
sinkpad.send_event(event.clone());
}
});
gst::PadProbeReturn::Ok
} }
} }
_ => gst::PadProbeReturn::Ok,
let event = ev.clone();
element.call_async(move |_| {
for sinkpad in sinkpads {
sinkpad.send_event(event.clone());
}
});
gst::PadProbeReturn::Ok
} }
}); });
@ -2056,13 +2058,15 @@ impl FallbackSrc {
let qos_probe_id = block_pad let qos_probe_id = block_pad
.add_probe(gst::PadProbeType::EVENT_UPSTREAM, |_pad, info| { .add_probe(gst::PadProbeType::EVENT_UPSTREAM, |_pad, info| {
if let Some(gst::PadProbeData::Event(ref ev)) = info.data { let Some(ev) = info.event() else {
if let gst::EventView::Qos(_) = ev.view() { return gst::PadProbeReturn::Ok;
return gst::PadProbeReturn::Drop; };
}
if ev.type_() != gst::EventType::Qos {
return gst::PadProbeReturn::Ok;
} }
gst::PadProbeReturn::Ok gst::PadProbeReturn::Drop
}) })
.unwrap(); .unwrap();
@ -2871,19 +2875,17 @@ impl FallbackSrc {
// error. We don't need to remove these pad probes because restarting the source will also // error. We don't need to remove these pad probes because restarting the source will also
// remove/add the pads again. // remove/add the pads again.
for pad in source.source.src_pads() { for pad in source.source.src_pads() {
pad.add_probe( pad.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, |_pad, info| {
gst::PadProbeType::EVENT_DOWNSTREAM, let Some(ev) = info.event() else {
|_pad, info| match info.data { return gst::PadProbeReturn::Pass;
Some(gst::PadProbeData::Event(ref event)) => { };
if event.type_() == gst::EventType::Eos {
gst::PadProbeReturn::Drop if ev.type_() != gst::EventType::Eos {
} else { return gst::PadProbeReturn::Pass;
gst::PadProbeReturn::Ok }
}
} gst::PadProbeReturn::Drop
_ => unreachable!(), })
},
)
.unwrap(); .unwrap();
} }

View file

@ -66,14 +66,12 @@ fn setup_sender_receiver(
sinkpad.add_probe( sinkpad.add_probe(
gst::PadProbeType::QUERY_UPSTREAM, gst::PadProbeType::QUERY_UPSTREAM,
move |_pad, probe_info| { move |_pad, probe_info| {
let query = match &mut probe_info.data { let Some(query) = probe_info.query_mut() else {
Some(gst::PadProbeData::Query(q)) => q, unreachable!();
_ => unreachable!(),
}; };
use gst::QueryViewMut::*;
match query.view_mut() { match query.view_mut() {
Latency(q) => { gst::QueryViewMut::Latency(q) => {
q.set(live, gst::ClockTime::ZERO, None); q.set(live, gst::ClockTime::ZERO, None);
gst::PadProbeReturn::Handled gst::PadProbeReturn::Handled
} }

View file

@ -1286,35 +1286,36 @@ impl UriPlaylistBin {
let src_pad_name = sync_sink.name().to_string().replace("sink", "src"); let src_pad_name = sync_sink.name().to_string().replace("sink", "src");
let sync_src = state.streamsynchronizer.static_pad(&src_pad_name).unwrap(); let sync_src = state.streamsynchronizer.static_pad(&src_pad_name).unwrap();
sync_src.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |_pad, info| { sync_src.add_probe(gst::PadProbeType::EVENT_DOWNSTREAM, move |_pad, info| {
match info.data { let Some(ev) = info.event() else {
Some(gst::PadProbeData::Event(ref ev)) return gst::PadProbeReturn::Pass;
if ev.type_() == gst::EventType::Eos => };
{
let element = match element_weak.upgrade() {
Some(element) => element,
None => return gst::PadProbeReturn::Remove,
};
let imp = element.imp();
let item = { if ev.type_() != gst::EventType::Eos {
let mut state_guard = imp.state.lock().unwrap(); return gst::PadProbeReturn::Pass;
let state = state_guard.as_mut().unwrap(); }
state.waiting_for_ss_eos.as_ref().cloned()
};
if let Some(item) = item { let element = match element_weak.upgrade() {
if item.dec_waiting_eos_ss() { Some(element) => element,
gst::debug!(CAT, imp: imp, "streamsynchronizer has been flushed, reorganize pipeline to fit new streams topology and unblock item"); None => return gst::PadProbeReturn::Remove,
imp.handle_topology_change(); };
gst::PadProbeReturn::Drop let imp = element.imp();
} else {
gst::PadProbeReturn::Drop let item = {
} let mut state_guard = imp.state.lock().unwrap();
} else { let state = state_guard.as_mut().unwrap();
gst::PadProbeReturn::Pass state.waiting_for_ss_eos.as_ref().cloned()
} };
if let Some(item) = item {
if item.dec_waiting_eos_ss() {
gst::debug!(CAT, imp: imp, "streamsynchronizer has been flushed, reorganize pipeline to fit new streams topology and unblock item");
imp.handle_topology_change();
gst::PadProbeReturn::Drop
} else {
gst::PadProbeReturn::Drop
} }
_ => gst::PadProbeReturn::Pass, } else {
gst::PadProbeReturn::Pass
} }
}); });
@ -1434,47 +1435,48 @@ impl UriPlaylistBin {
gst::PadProbeReturn::Pass gst::PadProbeReturn::Pass
} else { } else {
match info.data { let Some(ev) = info.event() else {
Some(gst::PadProbeData::Event(ref ev)) return gst::PadProbeReturn::Pass;
if ev.type_() == gst::EventType::Eos => };
{
if item.dec_waiting_eos() {
// all the streams are eos, item is now done
gst::log!(
CAT,
obj: element,
"all streams of item #{} are eos",
item.index()
);
let imp = element.imp(); if ev.type_() != gst::EventType::Eos {
{ return gst::PadProbeReturn::Pass;
let mut state_guard = imp.state.lock().unwrap();
let state = state_guard.as_mut().unwrap();
let index = item.index();
let removed = state
.streaming
.iter()
.position(|i| i.index() == index)
.map(|e| state.streaming.remove(e));
if let Some(item) = removed {
item.set_done();
state.done.push(item);
}
}
if let Err(e) = imp.start_next_item() {
imp.failed(e);
}
}
gst::PadProbeReturn::Remove
}
_ => gst::PadProbeReturn::Pass,
} }
if item.dec_waiting_eos() {
// all the streams are eos, item is now done
gst::log!(
CAT,
obj: element,
"all streams of item #{} are eos",
item.index()
);
let imp = element.imp();
{
let mut state_guard = imp.state.lock().unwrap();
let state = state_guard.as_mut().unwrap();
let index = item.index();
let removed = state
.streaming
.iter()
.position(|i| i.index() == index)
.map(|e| state.streaming.remove(e));
if let Some(item) = removed {
item.set_done();
state.done.push(item);
}
}
if let Err(e) = imp.start_next_item() {
imp.failed(e);
}
}
gst::PadProbeReturn::Remove
} }
}); });

View file

@ -300,7 +300,7 @@ impl TranscriberBin {
return gst::PadProbeReturn::Pass; return gst::PadProbeReturn::Pass;
} }
if let Some(gst::PadProbeData::Buffer(buffer)) = &mut probe_info.data { if let Some(buffer) = probe_info.buffer_mut() {
let buffer = buffer.make_mut(); let buffer = buffer.make_mut();
while let Some(meta) = buffer.meta_mut::<gst_video::VideoCaptionMeta>() { while let Some(meta) = buffer.meta_mut::<gst_video::VideoCaptionMeta>() {
meta.remove().unwrap(); meta.remove().unwrap();