mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-24 18:08:15 +00:00
webrtcsink: fix assertions when finalizing
Dumping the pipeline on state changes from an async bus handler was triggering criticals. Instead, dump from the sync handler. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1784>
This commit is contained in:
parent
ed8d700e23
commit
0b1d5d8682
1 changed files with 29 additions and 32 deletions
|
@ -134,10 +134,13 @@ struct CustomBusStream {
|
|||
}
|
||||
|
||||
impl CustomBusStream {
|
||||
fn new(bin: &super::BaseWebRTCSink, bus: &gst::Bus) -> Self {
|
||||
fn new(bin: &super::BaseWebRTCSink, pipeline: &gst::Pipeline, prefix: &str) -> Self {
|
||||
let (sender, receiver) = futures::channel::mpsc::unbounded();
|
||||
let bus = pipeline.bus().unwrap();
|
||||
|
||||
let bin_weak = bin.downgrade();
|
||||
let pipeline_weak = pipeline.downgrade();
|
||||
let prefix_clone = prefix.to_string();
|
||||
bus.set_sync_handler(move |_, msg| {
|
||||
match msg.view() {
|
||||
gst::MessageView::NeedContext(..) | gst::MessageView::HaveContext(..) => {
|
||||
|
@ -145,6 +148,21 @@ impl CustomBusStream {
|
|||
let _ = bin.post_message(msg.to_owned());
|
||||
}
|
||||
}
|
||||
gst::MessageView::StateChanged(state_changed) => {
|
||||
if let Some(pipeline) = pipeline_weak.upgrade() {
|
||||
if state_changed.src() == Some(pipeline.upcast_ref()) {
|
||||
pipeline.debug_to_dot_file_with_ts(
|
||||
gst::DebugGraphDetails::all(),
|
||||
format!(
|
||||
"{}-{:?}-to-{:?}",
|
||||
prefix_clone,
|
||||
state_changed.old(),
|
||||
state_changed.current()
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
_ => {
|
||||
let _ = sender.unbounded_send(msg.to_owned());
|
||||
}
|
||||
|
@ -2718,8 +2736,11 @@ impl BaseWebRTCSink {
|
|||
pipeline.set_start_time(gst::ClockTime::NONE);
|
||||
pipeline.set_base_time(element.base_time().unwrap());
|
||||
|
||||
let bus = pipeline.bus().unwrap();
|
||||
let mut bus_stream = CustomBusStream::new(&element, &bus);
|
||||
let mut bus_stream = CustomBusStream::new(
|
||||
&element,
|
||||
&pipeline,
|
||||
&format!("webrtcsink-session-{session_id}"),
|
||||
);
|
||||
let element_clone = element.downgrade();
|
||||
let pipeline_clone = pipeline.downgrade();
|
||||
let session_id_clone = session_id.clone();
|
||||
|
@ -2744,19 +2765,6 @@ impl BaseWebRTCSink {
|
|||
);
|
||||
let _ = this.remove_session(&element, &session_id_clone, true);
|
||||
}
|
||||
gst::MessageView::StateChanged(state_changed) => {
|
||||
if state_changed.src() == Some(pipeline.upcast_ref()) {
|
||||
pipeline.debug_to_dot_file_with_ts(
|
||||
gst::DebugGraphDetails::all(),
|
||||
format!(
|
||||
"webrtcsink-session-{}-{:?}-to-{:?}",
|
||||
session_id_clone,
|
||||
state_changed.old(),
|
||||
state_changed.current()
|
||||
),
|
||||
);
|
||||
}
|
||||
}
|
||||
gst::MessageView::Latency(..) => {
|
||||
gst::info!(CAT, obj: pipeline, "Recalculating latency");
|
||||
let _ = pipeline.recalculate_latency();
|
||||
|
@ -3353,8 +3361,11 @@ impl BaseWebRTCSink {
|
|||
.link(&sink)
|
||||
.with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?;
|
||||
|
||||
let bus = pipe.0.bus().unwrap();
|
||||
let mut stream = CustomBusStream::new(element, &bus);
|
||||
let mut stream = CustomBusStream::new(
|
||||
element,
|
||||
&pipe.0,
|
||||
&format!("webrtcsink-discovery-{}", pipe.0.name()),
|
||||
);
|
||||
|
||||
pipe.0
|
||||
.set_state(gst::State::Playing)
|
||||
|
@ -3377,20 +3388,6 @@ impl BaseWebRTCSink {
|
|||
);
|
||||
break Err(err.error().into());
|
||||
}
|
||||
gst::MessageView::StateChanged(s) => {
|
||||
if msg.src() == Some(pipe.0.upcast_ref()) {
|
||||
pipe.0.debug_to_dot_file_with_ts(
|
||||
gst::DebugGraphDetails::all(),
|
||||
format!(
|
||||
"webrtcsink-discovery-{}-{:?}-{:?}",
|
||||
pipe.0.name(),
|
||||
s.old(),
|
||||
s.current()
|
||||
),
|
||||
);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
gst::MessageView::Application(appmsg) => {
|
||||
let caps = match appmsg.structure() {
|
||||
Some(s) => {
|
||||
|
|
Loading…
Reference in a new issue