diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 49c8deb6..9b50e7ea 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -162,6 +162,7 @@ impl CustomBusStream { ); } } + let _ = sender.unbounded_send(msg.to_owned()); } _ => { let _ = sender.unbounded_send(msg.to_owned()); @@ -2744,6 +2745,8 @@ impl BaseWebRTCSink { let element_clone = element.downgrade(); let pipeline_clone = pipeline.downgrade(); let session_id_clone = session_id.clone(); + let offer_clone = offer.cloned(); + let peer_id_clone = peer_id.clone(); RUNTIME.spawn(async move { while let Some(msg) = bus_stream.next().await { @@ -2765,6 +2768,38 @@ impl BaseWebRTCSink { ); let _ = this.remove_session(&element, &session_id_clone, true); } + gst::MessageView::StateChanged(state_changed) => { + if state_changed.src() == Some(pipeline.upcast_ref()) + && state_changed.old() == gst::State::Ready + && state_changed.current() == gst::State::Paused + { + gst::info!( + CAT, + obj: pipeline, + "{peer_id_clone} pipeline reached PAUSED, negotiating" + ); + // We don't connect to on-negotiation-needed, this in order to call the above + // signal without holding the state lock: + // + // Going to Ready triggers synchronous emission of the on-negotiation-needed + // signal, during which time the application may add a data channel, causing + // renegotiation, which we do not support at this time. + // + // This is completely safe, as we know that by now all conditions are gathered: + // webrtcbin is in the Paused state, and all its transceivers have codec_preferences. + this.negotiate(&element, &session_id_clone, offer_clone.as_ref()); + + if let Err(err) = pipeline.set_state(gst::State::Playing) { + gst::warning!( + CAT, + obj: element, + "Failed to bring {peer_id_clone} pipeline to PLAYING: {}", + err + ); + let _ = this.remove_session(&element, &session_id_clone, true); + } + } + } gst::MessageView::Latency(..) => { gst::info!(CAT, obj: pipeline, "Recalculating latency"); let _ = pipeline.recalculate_latency(); @@ -2897,22 +2932,15 @@ impl BaseWebRTCSink { signaller.emit_by_name::<()>("consumer-added", &[&peer_id, &webrtcbin]); signaller.emit_by_name::<()>("webrtcbin-ready", &[&peer_id, &webrtcbin]); - // We don't connect to on-negotiation-needed, this in order to call the above - // signal without holding the state lock: - // - // Going to Ready triggers synchronous emission of the on-negotiation-needed - // signal, during which time the application may add a data channel, causing - // renegotiation, which we do not support at this time. - // - // This is completely safe, as we know that by now all conditions are gathered: - // webrtcbin is in the Ready state, and all its transceivers have codec_preferences. - this.negotiate(&element, &session_id, offer_clone.as_ref()); - - if let Err(err) = pipeline.set_state(gst::State::Playing) { + // We now bring the state to PAUSED before negotiating, as connecting + // input streams before that can create a race condition with our + // configuring of the format on the app sources and the start() + // implementation of base source resetting the format to bytes. + if let Err(err) = pipeline.set_state(gst::State::Paused) { gst::warning!( CAT, obj: element, - "Failed to bring {peer_id} pipeline to PLAYING: {}", + "Failed to bring {peer_id} pipeline to PAUSED: {}", err ); let _ = this.remove_session(&element, &session_id, true);