diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 7c971952f..4a63ead2c 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -1335,22 +1335,9 @@ impl SessionInner { element: &super::BaseWebRTCSink, producer: &StreamProducer, webrtc_pad: &WebRTCPad, - codecs: &BTreeMap, + session_setup_result: SessionSetupResult, ) -> Result<(), Error> { - // No stream name, pad only exists to deactivate media - let stream_name = match webrtc_pad.stream_name { - Some(ref name) => name, - None => { - gst::info!( - CAT, - obj = element, - "Consumer {} not connecting any input stream for inactive media {}", - self.peer_id, - webrtc_pad.media_idx - ); - return Ok(()); - } - }; + let (appsrc, encoding_chain, caps, codec, stream_name) = session_setup_result; gst::info!( CAT, @@ -1361,79 +1348,9 @@ impl SessionInner { webrtc_pad.media_idx ); - let payload = webrtc_pad.payload.unwrap(); - - let codec = match self.codecs { - Some(ref codecs) => { - gst::debug!(CAT, obj = element, "Picking codec from remote offer"); - - codecs - .get(&payload) - .cloned() - .ok_or_else(|| anyhow!("No codec for payload {}", payload))? - } - None => { - gst::debug!(CAT, obj = element, "Picking codec from local offer"); - - codecs - .get(&payload) - .cloned() - .ok_or_else(|| anyhow!("No codec for payload {}", payload))? - } - }; - - let appsrc = make_element("appsrc", Some(stream_name))?; - self.pipeline.add(&appsrc).unwrap(); - let pay_filter = make_element("capsfilter", None)?; self.pipeline.add(&pay_filter).unwrap(); - let output_caps = codec.output_filter().unwrap_or_else(gst::Caps::new_any); - - let PayloadChain { - payloader, - encoding_chain, - } = PayloadChainBuilder::new( - &webrtc_pad.in_caps, - &output_caps, - &codec, - element.emit_by_name::>( - "request-encoded-filter", - &[&Some(&self.peer_id), &stream_name, &codec.caps], - ), - ) - .build(&self.pipeline, &appsrc)?; - - if let Some(ref enc) = encoding_chain.encoder { - element.emit_by_name::("encoder-setup", &[&self.peer_id, &stream_name, &enc]); - } - - let sdp = self.sdp.as_ref().unwrap(); - let sdp_media = sdp.media(webrtc_pad.media_idx).unwrap(); - - let mut global_caps = gst::Caps::new_empty_simple("application/x-unknown"); - - sdp.attributes_to_caps(global_caps.get_mut().unwrap()) - .unwrap(); - sdp_media - .attributes_to_caps(global_caps.get_mut().unwrap()) - .unwrap(); - - let caps = sdp_media - .caps_from_media(payload) - .unwrap() - .intersect(&global_caps); - - element.imp().configure_payloader( - &self.peer_id, - stream_name, - &payloader, - &codec, - Some(webrtc_pad.ssrc), - Some(&caps), - ExtensionConfigurationType::Skip, - )?; - // At this point, the peer has provided its answer, and we want to // let the payloader / encoder perform negotiation according to that. // @@ -1483,7 +1400,7 @@ impl SessionInner { &self.id, codec.caps.structure(0).unwrap().name(), transceiver, - stream_name.clone(), + stream_name.to_string(), ) { match self.cc_info.heuristic { WebRTCSinkCongestionControl::Disabled => { @@ -1715,6 +1632,8 @@ enum ExtensionConfigurationType { Apply { twcc_id: u32 }, } +type SessionSetupResult = (gst::Element, EncodingChain, gst::Caps, Codec, String); + impl BaseWebRTCSink { fn configure_congestion_control( &self, @@ -3625,6 +3544,93 @@ impl BaseWebRTCSink { } } + fn setup_session_payloading_chain( + &self, + peer_id: &str, + webrtc_pad: &WebRTCPad, + codecs: &BTreeMap, + sdp: &gst_sdp::SDPMessage, + pipeline: &gst::Pipeline, + ) -> Result, Error> { + let stream_name = match webrtc_pad.stream_name { + Some(ref name) => name, + None => { + gst::info!( + CAT, + imp = self, + "Consumer {} not connecting any input stream for inactive media {}", + peer_id, + webrtc_pad.media_idx + ); + return Ok(None); + } + }; + + let appsrc = make_element("appsrc", Some(stream_name))?; + pipeline.add(&appsrc).unwrap(); + + let payload = webrtc_pad.payload.unwrap(); + + let codec = codecs + .get(&payload) + .cloned() + .ok_or_else(|| anyhow!("No codec for payload {}", payload))?; + + let output_caps = codec.output_filter().unwrap_or_else(gst::Caps::new_any); + + let PayloadChain { + payloader, + encoding_chain, + } = PayloadChainBuilder::new( + &webrtc_pad.in_caps, + &output_caps, + &codec, + self.obj().emit_by_name::>( + "request-encoded-filter", + &[&Some(peer_id), &stream_name, &codec.caps], + ), + ) + .build(pipeline, &appsrc)?; + + if let Some(ref enc) = encoding_chain.encoder { + self.obj() + .emit_by_name::("encoder-setup", &[&peer_id.to_string(), &stream_name, &enc]); + } + + let sdp_media = sdp.media(webrtc_pad.media_idx).unwrap(); + + let mut global_caps = gst::Caps::new_empty_simple("application/x-unknown"); + + sdp.attributes_to_caps(global_caps.get_mut().unwrap()) + .unwrap(); + sdp_media + .attributes_to_caps(global_caps.get_mut().unwrap()) + .unwrap(); + + let caps = sdp_media + .caps_from_media(payload) + .unwrap() + .intersect(&global_caps); + + self.configure_payloader( + peer_id, + stream_name, + &payloader, + &codec, + Some(webrtc_pad.ssrc), + Some(&caps), + ExtensionConfigurationType::Skip, + )?; + + Ok(Some(( + appsrc, + encoding_chain, + caps, + codec.clone(), + stream_name.to_owned(), + ))) + } + fn on_remote_description_set(&self, session_id: &str) { let mut state_guard = self.state.lock().unwrap(); let mut state = state_guard.deref_mut(); @@ -3664,8 +3670,51 @@ impl BaseWebRTCSink { .and_then(|stream| stream.producer.clone()) { drop(state_guard); + + let peer_id = session.peer_id.clone(); + let session_codecs = session.codecs.clone().unwrap_or_else(|| codecs.clone()); + let sdp = session.sdp.clone(); + let pipeline = session.pipeline.clone(); + + drop(session); + + let res = match self.setup_session_payloading_chain( + &peer_id, + webrtc_pad, + &session_codecs, + sdp.as_ref().unwrap(), + &pipeline, + ) { + Err(err) => { + gst::error!( + CAT, + imp = self, + "Failed to setup elements {} for session {}: {}", + stream_name, + session_id, + err + ); + remove = true; + session = session_clone.lock().unwrap(); + state_guard = self.state.lock().unwrap(); + state = state_guard.deref_mut(); + break; + } + Ok(Some(res)) => res, + Ok(None) => { + session = session_clone.lock().unwrap(); + state_guard = self.state.lock().unwrap(); + state = state_guard.deref_mut(); + continue; + } + }; + + session = session_clone.lock().unwrap(); + state_guard = self.state.lock().unwrap(); + state = state_guard.deref_mut(); + if let Err(err) = - session.connect_input_stream(&self.obj(), &producer, webrtc_pad, &codecs) + session.connect_input_stream(&self.obj(), &producer, webrtc_pad, res) { gst::error!( CAT, @@ -3676,14 +3725,8 @@ impl BaseWebRTCSink { err ); remove = true; - state_guard = self.state.lock().unwrap(); - state = state_guard.deref_mut(); break; } - drop(session); - state_guard = self.state.lock().unwrap(); - state = state_guard.deref_mut(); - session = session_clone.lock().unwrap(); } else { gst::error!( CAT,