diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index c7cb6341..3100007c 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -167,10 +167,13 @@ struct CustomBusStream { } impl CustomBusStream { - fn new(bin: &super::BaseWebRTCSink, bus: &gst::Bus) -> Self { + fn new(bin: &super::BaseWebRTCSink, pipeline: &gst::Pipeline, prefix: &str) -> Self { let (sender, receiver) = futures::channel::mpsc::unbounded(); + let bus = pipeline.bus().unwrap(); let bin_weak = bin.downgrade(); + let pipeline_weak = pipeline.downgrade(); + let prefix_clone = prefix.to_string(); bus.set_sync_handler(move |_, msg| { match msg.view() { gst::MessageView::NeedContext(..) | gst::MessageView::HaveContext(..) => { @@ -178,6 +181,21 @@ impl CustomBusStream { let _ = bin.post_message(msg.clone()); } } + gst::MessageView::StateChanged(state_changed) => { + if let Some(pipeline) = pipeline_weak.upgrade() { + if state_changed.src() == Some(pipeline.upcast_ref()) { + pipeline.debug_to_dot_file_with_ts( + gst::DebugGraphDetails::all(), + format!( + "{}-{:?}-to-{:?}", + prefix_clone, + state_changed.old(), + state_changed.current() + ), + ); + } + } + } _ => { let _ = sender.unbounded_send(msg.clone()); } @@ -3038,8 +3056,11 @@ impl BaseWebRTCSink { pipeline.set_start_time(gst::ClockTime::NONE); pipeline.set_base_time(element.base_time().unwrap()); - let bus = pipeline.bus().unwrap(); - let mut bus_stream = CustomBusStream::new(&element, &bus); + let mut bus_stream = CustomBusStream::new( + &element, + &pipeline, + &format!("webrtcsink-session-{session_id}"), + ); let element_clone = element.downgrade(); let pipeline_clone = pipeline.downgrade(); let session_id_clone = session_id.clone(); @@ -3064,19 +3085,6 @@ impl BaseWebRTCSink { ); let _ = this.remove_session(&session_id_clone, true); } - gst::MessageView::StateChanged(state_changed) => { - if state_changed.src() == Some(pipeline.upcast_ref()) { - pipeline.debug_to_dot_file_with_ts( - gst::DebugGraphDetails::all(), - format!( - "webrtcsink-session-{}-{:?}-to-{:?}", - session_id_clone, - state_changed.old(), - state_changed.current() - ), - ); - } - } gst::MessageView::Latency(..) => { gst::info!(CAT, obj = pipeline, "Recalculating latency"); let _ = pipeline.recalculate_latency(); @@ -3653,8 +3661,11 @@ impl BaseWebRTCSink { .link(&sink) .with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?; - let bus = pipe.0.bus().unwrap(); - let mut stream = CustomBusStream::new(&self.obj(), &bus); + let mut stream = CustomBusStream::new( + &self.obj(), + &pipe.0, + &format!("webrtcsink-discovery-{}", pipe.0.name()), + ); pipe.0 .set_state(gst::State::Playing) @@ -3677,20 +3688,6 @@ impl BaseWebRTCSink { ); break Err(err.error().into()); } - gst::MessageView::StateChanged(s) => { - if msg.src() == Some(pipe.0.upcast_ref()) { - pipe.0.debug_to_dot_file_with_ts( - gst::DebugGraphDetails::all(), - format!( - "webrtcsink-discovery-{}-{:?}-{:?}", - pipe.0.name(), - s.old(), - s.current() - ), - ); - } - continue; - } gst::MessageView::Application(appmsg) => { let caps = match appmsg.structure() { Some(s) => {