diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs index db1bdef42..7e913c845 100644 --- a/net/webrtc/src/utils.rs +++ b/net/webrtc/src/utils.rs @@ -436,6 +436,7 @@ pub struct Codec { pub is_raw: bool, payload_type: Option, + clock_rate: Option, decoding_info: Option, encoding_info: Option, } @@ -450,8 +451,9 @@ impl Codec { encoders: &glib::List, payloaders: &glib::List, ) -> Self { + let mut clock_rate: Option = None; let has_decoder = Self::has_decoder_for_caps(caps, decoders); - let has_depayloader = Self::has_depayloader_for_codec(name, depayloaders); + let has_depayloader = Self::has_depayloader_for_codec(name, depayloaders, &mut clock_rate); let decoding_info = if has_depayloader && has_decoder { Some(DecodingInfo { @@ -480,6 +482,7 @@ impl Codec { name: name.into(), is_raw: false, payload_type: None, + clock_rate, decoding_info, encoding_info, } @@ -491,7 +494,9 @@ impl Codec { depayloaders: &glib::List, payloaders: &glib::List, ) -> Self { - let decoding_info = if Self::has_depayloader_for_codec(name, depayloaders) { + let mut clock_rate: Option = None; + let decoding_info = if Self::has_depayloader_for_codec(name, depayloaders, &mut clock_rate) + { Some(DecodingInfo { has_decoder: AtomicBool::new(false), }) @@ -523,6 +528,7 @@ impl Codec { name: name.into(), is_raw: true, payload_type: None, + clock_rate, decoding_info, encoding_info, } @@ -632,6 +638,7 @@ impl Codec { fn has_depayloader_for_codec( codec: &str, depayloaders: &glib::List, + clock_rate: &mut Option, ) -> bool { depayloaders.iter().any(|factory| { factory.static_pad_templates().iter().any(|template| { @@ -646,7 +653,22 @@ impl Codec { && s.get::("encoding-name").map_or_else( |_| { if let Ok(encoding_name) = s.get::<&str>("encoding-name") { - encoding_name == codec + if encoding_name == codec { + if s.has_field("clock-rate") { + match s.get_optional::("clock-rate") { + Ok(Some(rate)) => { + *clock_rate = Some(rate); + } + _ => { + // if None or Err or IntRange + *clock_rate = None; + } + }; + } + true + } else { + false + } } else { false } @@ -668,6 +690,10 @@ impl Codec { self.payload_type } + pub fn clock_rate(&self) -> Option { + self.clock_rate + } + pub fn build_encoder(&self) -> Option> { self.encoding_info.as_ref().and_then(|info| { info.encoder.as_ref().map(|encoder| { diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index 887bfc8c9..0dc9305c1 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -333,6 +333,7 @@ struct SignallerSignals { request_meta: glib::SignalHandlerId, session_description: glib::SignalHandlerId, handle_ice: glib::SignalHandlerId, + session_requested: glib::SignalHandlerId, } impl Session { @@ -818,12 +819,148 @@ impl Session { ghostpad } + fn generate_offer(&self, element: &super::BaseWebRTCSrc) { + let sess_id = self.id.clone(); + let webrtcbin = self.webrtcbin(); + let direction = gst_webrtc::WebRTCRTPTransceiverDirection::Recvonly; + let mut pt = 96..127; + let settings = element.imp().settings.lock().unwrap(); + let caps = settings + .video_codecs + .iter() + .chain(settings.audio_codecs.iter()) + .map(|codec| { + let name = &codec.name; + + let Some(pt) = pt.next() else { + gst::warning!( + CAT, + obj = element, + "exhausted the list of dynamic payload types, not adding transceiver for {name}" + ); + return None; + }; + + let (media, clock_rate) = if codec.is_video() { + ("video", codec.clock_rate().unwrap_or(90000)) + } else { + ("audio", codec.clock_rate().unwrap_or(48000)) + }; + + let mut caps = gst::Caps::new_empty(); + { + let caps = caps.get_mut().unwrap(); + let mut s = gst::Structure::builder("application/x-rtp") + .field("media", media) + .field("payload", pt) + .field("encoding-name", name.as_str()) + .field("clock-rate", clock_rate) + .build(); + + if name.eq_ignore_ascii_case("H264") { + // support the constrained-baseline profile for now + // TODO: extend this to other supported profiles by querying the decoders + s.set("profile-level-id", "42e016"); + } + + caps.append_structure(s); + } + Some(caps) + }); + + for c in caps.flatten() { + gst::info!(CAT, obj = element, "Adding transceiver with caps: {c:#?}"); + let transceiver = webrtcbin.emit_by_name::( + "add-transceiver", + &[&direction, &c], + ); + + transceiver.set_property("do_nack", settings.do_retransmission); + transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed); + } + + let webrtcbin_weak = webrtcbin.downgrade(); + let promise = gst::Promise::with_change_func(glib::clone!( + #[weak (rename_to = ele)] + element, + move |reply| { + let Some(webrtcbin) = webrtcbin_weak.upgrade() else { + gst::error!(CAT, obj = ele, "generate offer::failed to get webrtcbin"); + ele.imp().signaller().end_session(sess_id.as_str()); + return; + }; + + let reply = match reply { + Ok(Some(reply)) => reply, + Ok(None) => { + gst::error!( + CAT, + obj = ele, + "generate offer::Promise returned with no reply" + ); + ele.imp().signaller().end_session(sess_id.as_str()); + return; + } + Err(e) => { + gst::error!( + CAT, + obj = ele, + "generate offer::Promise returned with error {:?}", + e + ); + ele.imp().signaller().end_session(sess_id.as_str()); + return; + } + }; + + if let Ok(offer_sdp) = reply + .value("offer") + .map(|offer| offer.get::().unwrap()) + { + webrtcbin.emit_by_name::<()>( + "set-local-description", + &[&offer_sdp, &None::], + ); + + gst::log!( + CAT, + obj = ele, + "Sending SDP, {}", + offer_sdp.sdp().to_string() + ); + + let signaller = ele.imp().signaller(); + signaller.send_sdp(sess_id.as_str(), &offer_sdp); + } else { + let error = reply + .value("error") + .expect("structure must have an error value") + .get::() + .expect("value must be a GLib error"); + + gst::error!( + CAT, + obj = ele, + "generate offer::Promise returned with error: {}", + error + ); + ele.imp().signaller().end_session(sess_id.as_str()); + } + } + )); + + webrtcbin + .clone() + .emit_by_name::<()>("create-offer", &[&None::, &promise]); + } + fn remote_description_set( &mut self, element: &super::BaseWebRTCSrc, - offer: &gst_webrtc::WebRTCSessionDescription, + desc: &gst_webrtc::WebRTCSessionDescription, ) -> (gst::Promise, gst::Bin) { - let sdp = offer.sdp(); + let sdp = desc.sdp(); + let desc_type = desc.type_(); let webrtcbin = self.webrtcbin(); for (i, media) in sdp.medias().enumerate() { let (codec_names, do_retransmission) = { @@ -890,40 +1027,53 @@ impl Session { .imp() .create_and_probe_src_pad(&caps, &stream_id, self) { - gst::info!( + if desc_type == gst_webrtc::WebRTCSDPType::Offer { + gst::info!( CAT, obj = element, "Getting transceiver for {stream_id} and index {i} with caps: {caps:#?}" - ); - let mut transceiver = None; - let mut idx = 0i32; - // find the transceiver with this mline - loop { - let Some(to_check) = webrtcbin - .emit_by_name::>( - "get-transceiver", - &[&idx], - ) - else { - break; - }; - let mline = to_check.property::("mlineindex"); - if mline as usize == i { - transceiver = Some(to_check); - break; + ); + + let mut transceiver = None; + let mut idx = 0i32; + // find the transceiver with this mline + loop { + let Some(to_check) = webrtcbin + .emit_by_name::>( + "get-transceiver", + &[&idx], + ) + else { + break; + }; + let mline = to_check.property::("mlineindex"); + if mline as usize == i { + transceiver = Some(to_check); + break; + } + idx += 1; } - idx += 1; - } - let transceiver = transceiver.unwrap_or_else(|| { + let transceiver = transceiver.unwrap_or_else(|| { gst::warning!(CAT, "Transceiver for idx {i} does not exist, GStreamer <= 1.24, adding it ourself"); webrtcbin.emit_by_name::( "add-transceiver", &[&gst_webrtc::WebRTCRTPTransceiverDirection::Recvonly, &caps]) - }); + }); - transceiver.set_property("do_nack", do_retransmission); - transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed); - transceiver.set_property("codec-preferences", caps); + transceiver.set_property("do_nack", do_retransmission); + transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed); + transceiver.set_property("codec-preferences", caps); + } else { + // SDP type is answer, + // so the transceiver must have already been created while sending offer + } + } else { + gst::error!( + CAT, + obj = element, + "Failed to create src pad with caps {:?}", + caps + ); } } else { gst::info!( @@ -940,53 +1090,78 @@ impl Session { #[strong(rename_to = session_id)] self.id, move |reply| { - let state = element.imp().state.lock().unwrap(); - gst::info!(CAT, obj = element, "got answer for session {session_id:?}"); - let Some(session) = state.sessions.get(&session_id) else { - gst::error!(CAT, obj = element, "no session {session_id:?}"); + if desc_type == gst_webrtc::WebRTCSDPType::Offer { + let state = element.imp().state.lock().unwrap(); + gst::info!(CAT, obj = element, "got answer for session {session_id:?}"); + let Some(session) = state.sessions.get(&session_id) else { + gst::error!(CAT, obj = element, "no session {session_id:?}"); + return; + }; + session.on_answer_created(reply, &element); + } else { + gst::log!( + CAT, + obj = element, + "Nothing to do in the promise in case of an answer" + ); return; - }; - session.on_answer_created(reply, &element); + } } )); (promise, webrtcbin.clone()) } - fn handle_offer( + fn handle_remote_description( &mut self, - offer: &gst_webrtc::WebRTCSessionDescription, + desc: &gst_webrtc::WebRTCSessionDescription, element: &super::BaseWebRTCSrc, ) -> (gst::Promise, gst::Bin) { - gst::log!(CAT, obj = element, "Got offer {}", offer.sdp().to_string()); + gst::debug!( + CAT, + obj = element, + "Got remote description: {}", + desc.sdp().to_string() + ); let promise = gst::Promise::with_change_func(glib::clone!( #[weak] element, #[strong] - offer, + desc, #[strong(rename_to = session_id)] self.id, move |_| { let mut state = element.imp().state.lock().unwrap(); - gst::info!(CAT, obj = element, "got offer for session {session_id:?}"); + gst::info!( + CAT, + obj = element, + "got {:?} for session {session_id:?}", + desc.type_() + ); let Some(session) = state.sessions.get_mut(&session_id) else { gst::error!(CAT, obj = element, "no session {session_id:?}"); return; }; - let (promise, webrtcbin) = session.remote_description_set(&element, &offer); + let (promise, webrtcbin) = session.remote_description_set(&element, &desc); drop(state); - webrtcbin.emit_by_name::<()>("create-answer", &[&None::, &promise]); + if desc.type_() == gst_webrtc::WebRTCSDPType::Offer { + webrtcbin + .emit_by_name::<()>("create-answer", &[&None::, &promise]); + } else { + // Nothing to do with the promise in case of an answer + promise.reply(None); + } } )); // We cannot emit `set-remote-description` from here. The promise // function needs the state lock which is held by the caller - // of `handle_offer`. So return the promise to the caller so that - // the it can drop the `state` and safely emit `set-remote-description` + // of `handle_remote_description`. So return the promise to the caller so that + // it can drop the `state` and safely emit `set-remote-description` (promise, self.webrtcbin().clone()) } @@ -1225,6 +1400,22 @@ impl BaseWebRTCSrc { ), ), + session_requested: signaller.connect_closure( + "session-requested", + false, + glib::closure!(#[watch] instance, move |_signaler: glib::Object, session_id: &str, _peer_id: &str, offer: Option<&gst_webrtc::WebRTCSessionDescription>|{ + if offer.is_none() { + let this = instance.imp(); + let state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get(session_id) else { + gst::error!(CAT, imp = this, "session {session_id:?} not found"); + return + }; + session.generate_offer(&this.obj()); + } + }), + ), + request_meta: signaller.connect_closure( "request-meta", false, @@ -1246,20 +1437,26 @@ impl BaseWebRTCSrc { move |_signaller: glib::Object, session_id: &str, desc: &gst_webrtc::WebRTCSessionDescription| { - assert_eq!(desc.type_(), gst_webrtc::WebRTCSDPType::Offer); - let this = instance.imp(); - gst::info!(CAT, imp = this, "got sdp offer"); - let mut state = this.state.lock().unwrap(); - let Some(session) = state.sessions.get_mut(session_id) else { - gst::error!(CAT, imp = this, "session {session_id:?} not found"); - return; - }; + match desc.type_() { + gst_webrtc::WebRTCSDPType::Offer | gst_webrtc::WebRTCSDPType::Answer => { + let this = instance.imp(); + gst::info!(CAT, imp = this, "got sdp : {:?}", desc.type_()); + let mut state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get_mut(session_id) else { + gst::error!(CAT, imp = this, "session {session_id:?} not found"); + return; + }; - let (promise, webrtcbin) = session.handle_offer(desc, &this.obj()); - drop(state); - webrtcbin - .emit_by_name::<()>("set-remote-description", &[&desc, &promise]); + let (promise, webrtcbin) = session.handle_remote_description(desc, &this.obj()); + drop(state); + webrtcbin + .emit_by_name::<()>("set-remote-description", &[&desc, &promise]); + }, + _ => { + unimplemented!("{:?} type remote description not handled", desc.type_()); + }, } + } ), ),