webrtcsink: fix assertions when finalizing

Dumping the pipeline on state changes from an async bus handler was
triggering criticals.

Instead, dump from the sync handler.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1706>
This commit is contained in:
Mathieu Duponchelle 2024-08-12 09:13:06 +02:00
parent 30a5987c9e
commit 0da1c8e9c9

View file

@ -167,10 +167,13 @@ struct CustomBusStream {
}
impl CustomBusStream {
fn new(bin: &super::BaseWebRTCSink, bus: &gst::Bus) -> Self {
fn new(bin: &super::BaseWebRTCSink, pipeline: &gst::Pipeline, prefix: &str) -> Self {
let (sender, receiver) = futures::channel::mpsc::unbounded();
let bus = pipeline.bus().unwrap();
let bin_weak = bin.downgrade();
let pipeline_weak = pipeline.downgrade();
let prefix_clone = prefix.to_string();
bus.set_sync_handler(move |_, msg| {
match msg.view() {
gst::MessageView::NeedContext(..) | gst::MessageView::HaveContext(..) => {
@ -178,6 +181,21 @@ impl CustomBusStream {
let _ = bin.post_message(msg.clone());
}
}
gst::MessageView::StateChanged(state_changed) => {
if let Some(pipeline) = pipeline_weak.upgrade() {
if state_changed.src() == Some(pipeline.upcast_ref()) {
pipeline.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
format!(
"{}-{:?}-to-{:?}",
prefix_clone,
state_changed.old(),
state_changed.current()
),
);
}
}
}
_ => {
let _ = sender.unbounded_send(msg.clone());
}
@ -3038,8 +3056,11 @@ impl BaseWebRTCSink {
pipeline.set_start_time(gst::ClockTime::NONE);
pipeline.set_base_time(element.base_time().unwrap());
let bus = pipeline.bus().unwrap();
let mut bus_stream = CustomBusStream::new(&element, &bus);
let mut bus_stream = CustomBusStream::new(
&element,
&pipeline,
&format!("webrtcsink-session-{session_id}"),
);
let element_clone = element.downgrade();
let pipeline_clone = pipeline.downgrade();
let session_id_clone = session_id.clone();
@ -3064,19 +3085,6 @@ impl BaseWebRTCSink {
);
let _ = this.remove_session(&session_id_clone, true);
}
gst::MessageView::StateChanged(state_changed) => {
if state_changed.src() == Some(pipeline.upcast_ref()) {
pipeline.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
format!(
"webrtcsink-session-{}-{:?}-to-{:?}",
session_id_clone,
state_changed.old(),
state_changed.current()
),
);
}
}
gst::MessageView::Latency(..) => {
gst::info!(CAT, obj = pipeline, "Recalculating latency");
let _ = pipeline.recalculate_latency();
@ -3653,8 +3661,11 @@ impl BaseWebRTCSink {
.link(&sink)
.with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?;
let bus = pipe.0.bus().unwrap();
let mut stream = CustomBusStream::new(&self.obj(), &bus);
let mut stream = CustomBusStream::new(
&self.obj(),
&pipe.0,
&format!("webrtcsink-discovery-{}", pipe.0.name()),
);
pipe.0
.set_state(gst::State::Playing)
@ -3677,20 +3688,6 @@ impl BaseWebRTCSink {
);
break Err(err.error().into());
}
gst::MessageView::StateChanged(s) => {
if msg.src() == Some(pipe.0.upcast_ref()) {
pipe.0.debug_to_dot_file_with_ts(
gst::DebugGraphDetails::all(),
format!(
"webrtcsink-discovery-{}-{:?}-{:?}",
pipe.0.name(),
s.old(),
s.current()
),
);
}
continue;
}
gst::MessageView::Application(appmsg) => {
let caps = match appmsg.structure() {
Some(s) => {