diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index ed976a19..c07ffb2f 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -292,6 +292,7 @@ struct SignallerSignals { request_meta: glib::SignalHandlerId, session_description: glib::SignalHandlerId, handle_ice: glib::SignalHandlerId, + session_requested: glib::SignalHandlerId, } impl Session { @@ -572,6 +573,210 @@ impl Session { } } + fn generate_offer(&self, element: &super::BaseWebRTCSrc) { + let sess_id = self.id.clone(); + let webrtcbin = self.webrtcbin(); + let direction = gst_webrtc::WebRTCRTPTransceiverDirection::Recvonly; + + let settings = element.imp().settings.lock().unwrap(); + let caps = settings + .video_codecs + .iter() + .chain(settings.audio_codecs.iter()) + .map(|codec| { + let name = &codec.name; + + let (media, clock_rate, pt) = if codec.stream_type == gst::StreamType::AUDIO { + ("audio", 48000, 96) + } else { + //video stream type + ("video", 90000, 101) + }; + + let mut caps = gst::Caps::new_empty(); + { + let caps = caps.get_mut().unwrap(); + let s = gst::Structure::builder("application/x-rtp") + .field("media", media) + .field("payload", pt) + .field("encoding-name", name.as_str()) + .field("clock-rate", clock_rate) + .build(); + caps.append_structure(s); + } + caps + }); + + for c in caps { + gst::info!( + CAT, + obj: element, + "Adding transceiver with caps: {c:#?}" + ); + let transceiver = webrtcbin.emit_by_name::( + "add-transceiver", + &[&direction, &c], + ); + + transceiver.set_property("do_nack", settings.do_retransmission); + transceiver.set_property("fec-type", gst_webrtc::WebRTCFECType::UlpRed); + } + + let webrtcbin_weak = webrtcbin.downgrade(); + let promise = gst::Promise::with_change_func( + glib::clone!(@weak element as ele => move |reply| { + + let Some(webrtcbin) = webrtcbin_weak.upgrade() else { + return; + }; + + let reply = match reply { + Ok(Some(reply)) => reply, + Ok(None) => { + gst::error!(CAT, obj: ele, "generate offer::Promise returned with no reply"); + return; + } + Err(e) => { + gst::error!(CAT, obj: ele, "generate offer::Promise returned with error {:?}", e); + return; + } + }; + + if let Ok(offer_sdp) = reply + .value("offer") + .map(|offer| offer.get::().unwrap()) + { + gst::debug!( + CAT, + obj: ele, + "Setting local description: {}", + offer_sdp.sdp().to_string() + ); + + webrtcbin.emit_by_name::<()>( + "set-local-description", + &[&offer_sdp, &None::], + ); + + gst::log!(CAT, obj: ele, "Sending SDP, {}", offer_sdp.sdp().to_string()); + let signaller = ele.imp().signaller(); + signaller.send_sdp(sess_id.as_str(), &offer_sdp); + } else { + let error = reply + .value("error") + .expect("structure must have an error value") + .get::() + .expect("value must be a GLib error"); + + gst::error!(CAT, obj: ele, "generate offer::Promise returned with error: {}", error); + } + }), + ); + + webrtcbin + .clone() + .emit_by_name::<()>("create-offer", &[&None::, &promise]); + } + + fn handle_answer( + &self, + answer: &gst_webrtc::WebRTCSessionDescription, + element: &super::BaseWebRTCSrc, + ) { + //FIXME: refactor the common parts of this function and handle_offer() + gst::debug!( + CAT, + obj: element, + "Setting remote description: {}", + answer.sdp().to_string() + ); + + let webrtcbin = self.webrtcbin(); + for (i, media) in answer.sdp().medias().enumerate() { + let codec_names = { + let settings = element.imp().settings.lock().unwrap(); + settings + .video_codecs + .iter() + .chain(settings.audio_codecs.iter()) + .map(|codec| codec.name.clone()) + .collect::>() + }; + let caps = media + .formats() + .filter_map(|format| { + format.parse::().ok().and_then(|pt| { + let mut mediacaps = media.caps_from_media(pt)?; + let s = mediacaps.structure(0).unwrap(); + if !codec_names.contains(s.get::<&str>("encoding-name").ok()?) { + return None; + } + + // filter the remote media whose direction is not sendonly + media.attribute_val("sendonly")?; + + let mut filtered_s = gst::Structure::new_empty("application/x-rtp"); + filtered_s.extend(s.iter().filter_map(|(key, value)| { + if key.starts_with("rtcp-") { + None + } else { + Some((key, value.to_owned())) + } + })); + + if media + .attributes_to_caps(mediacaps.get_mut().unwrap()) + .is_err() + { + gst::warning!( + CAT, + obj: element, + "Failed to retrieve attributes from media!" + ); + return None; + } + + let s = mediacaps.structure(0).unwrap(); + + filtered_s.extend(s.iter().filter_map(|(key, value)| { + if key.starts_with("extmap-") { + return Some((key, value.to_owned())); + } + + None + })); + + Some(filtered_s) + }) + }) + .collect::(); + + if !caps.is_empty() { + let stream_id = self.get_stream_id(None, Some(i as u32)).unwrap(); + if !element + .imp() + .create_and_probe_src_pad(&caps, &stream_id, self) + { + gst::error!( + CAT, + obj: element, + "Failed to create src pad with caps {:?}", + caps + ); + } + } else { + gst::info!( + CAT, + obj: element, + "Not using media: {media:#?} as it doesn't match our codec restrictions" + ); + } + } + + webrtcbin.emit_by_name::<()>("set-remote-description", &[&answer, &None::]); + + } + fn handle_offer( &self, offer: &gst_webrtc::WebRTCSessionDescription, @@ -887,18 +1092,49 @@ impl BaseWebRTCSrc { _signaller: glib::Object, session_id: &str, desc: &gst_webrtc::WebRTCSessionDescription| { - assert_eq!(desc.type_(), gst_webrtc::WebRTCSDPType::Offer); - let this = instance.imp(); - gst::info!(CAT, imp: this, "got sdp offer"); - let state = this.state.lock().unwrap(); - let Some(session) = state.sessions.get(session_id) else { - gst::error!(CAT, imp: this, "session {session_id:?} not found"); - return - }; + match desc.type_() { + gst_webrtc::WebRTCSDPType::Offer =>{ + let this = instance.imp(); + gst::info!(CAT, imp: this, "got sdp offer"); + let state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get(session_id) else { + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return + }; - let (promise, webrtcbin) = session.handle_offer(desc, &this.obj()); - drop (state); - webrtcbin.emit_by_name::<()>("create-answer", &[&None::, &promise]); + let (promise, webrtcbin) = session.handle_offer(desc, &this.obj()); + drop (state); + webrtcbin.emit_by_name::<()>("create-answer", &[&None::, &promise]); + }, + + gst_webrtc::WebRTCSDPType::Answer => { + let this = instance.imp(); + gst::info!(CAT, imp: this, "got sdp answer"); + let state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get(session_id) else { + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return + }; + session.handle_answer(desc, &this.obj()); + }, + _ => {}, + } + }), + ), + + session_requested: signaller.connect_closure( + "session-requested", + false, + glib::closure!(@watch instance => move |_signaler: glib::Object, session_id: &str, _peer_id: &str, offer: Option<&gst_webrtc::WebRTCSessionDescription>|{ + if offer.is_none() { + let this = instance.imp(); + let state = this.state.lock().unwrap(); + let Some(session) = state.sessions.get(session_id) else { + gst::error!(CAT, imp: this, "session {session_id:?} not found"); + return + }; + session.generate_offer(&this.obj()); + } }), ), @@ -995,13 +1231,15 @@ impl BaseWebRTCSrc { fn maybe_start_signaller(&self) { let obj = self.obj(); - let mut state = self.state.lock().unwrap(); + let state = self.state.lock().unwrap(); if state.signaller_state == SignallerState::Stopped && obj.current_state() >= gst::State::Paused { + drop(state); self.signaller().start(); gst::info!(CAT, imp: self, "Started signaller"); + let mut state = self.state.lock().unwrap(); state.signaller_state = SignallerState::Started; } }