From 3fc0326084d0c058040192309422fb69211b30ad Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Thu, 27 Oct 2022 22:00:57 +0530 Subject: [PATCH] webrtchttp: whipsink: Add candidates when sending the offer WHIP endpoint providers like Cloudflare do not support Trickle ICE and need candidates to be send along with the initial offer. Instead of sending the offer in create-offer promise, send it once the ICE candidates have been gathered. While at it add properties to set STUN and TURN server along with the ICE transport policy as at least when testing the Cloudflare WHIP endpoint seems unreachable without it. This has also been observed with Cloudflare provided demos. Part-of: --- docs/plugins/gst_plugins_cache.json | 50 ++++++++ net/webrtchttp/src/whipsink/imp.rs | 171 +++++++++++++++++++++++----- 2 files changed, 192 insertions(+), 29 deletions(-) diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index aa5e841a..0029d3ee 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -7924,6 +7924,56 @@ "type": "gchararray", "writable": true }, + "ice-transport-policy": { + "blurb": "The policy to apply for ICE transport", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "all (0)", + "mutable": "null", + "readable": true, + "type": "GstRsWebRTCICETransportPolicy", + "writable": true + }, + "stun-server": { + "blurb": "The STUN server of the form stun://hostname:port", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "timeout": { + "blurb": "Value in seconds to timeout WHIP endpoint requests (0 = No timeout).", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "15", + "max": "3600", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint", + "writable": true + }, + "turn-server": { + "blurb": "The TURN server of the form turn(s)://username:password@host:port.", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, "use-link-headers": { "blurb": "Use link headers to configure ice-servers from the WHIP server response to the POST or OPTIONS request.\n If set to TRUE and the WHIP server returns valid ice-servers,\n this property overrides the ice-servers values set using the stun-server and turn-server properties.", "conditionally-available": false, diff --git a/net/webrtchttp/src/whipsink/imp.rs b/net/webrtchttp/src/whipsink/imp.rs index 4677824b..ee712a3c 100644 --- a/net/webrtchttp/src/whipsink/imp.rs +++ b/net/webrtchttp/src/whipsink/imp.rs @@ -8,7 +8,9 @@ // SPDX-License-Identifier: MPL-2.0 use crate::utils::{build_reqwest_client, parse_redirect_location, set_ice_servers, wait}; +use crate::GstRsWebRTCICETransportPolicy; use futures::future; +use gst::element_imp_error; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; @@ -25,6 +27,8 @@ static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new("whipsink", gst::DebugColorFlags::empty(), Some("WHIP Sink")) }); +const DEFAULT_ICE_TRANSPORT_POLICY: GstRsWebRTCICETransportPolicy = + GstRsWebRTCICETransportPolicy::All; const MAX_REDIRECTS: u8 = 10; #[derive(Debug, Clone)] @@ -32,6 +36,9 @@ struct Settings { whip_endpoint: Option, use_link_headers: bool, auth_token: Option, + turn_server: Option, + stun_server: Option, + ice_transport_policy: GstRsWebRTCICETransportPolicy, } #[allow(clippy::derivable_impls)] @@ -41,6 +48,9 @@ impl Default for Settings { whip_endpoint: None, use_link_headers: false, auth_token: None, + stun_server: None, + turn_server: None, + ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY, } } } @@ -206,7 +216,22 @@ impl ObjectImpl for WhipSink { .nick("Authorization Token") .blurb("Authentication token to use, will be sent in the HTTP Header as 'Bearer '") .mutable_ready() - .build() + .build(), + + glib::ParamSpecString::builder("stun-server") + .nick("STUN Server") + .blurb("The STUN server of the form stun://hostname:port") + .build(), + + glib::ParamSpecString::builder("turn-server") + .nick("TURN Server") + .blurb("The TURN server of the form turn(s)://username:password@host:port.") + .build(), + + glib::ParamSpecEnum::builder::("ice-transport-policy", DEFAULT_ICE_TRANSPORT_POLICY) + .nick("ICE transport policy") + .blurb("The policy to apply for ICE transport") + .build(), ] }); PROPERTIES.as_ref() @@ -228,6 +253,36 @@ impl ObjectImpl for WhipSink { let mut settings = self.settings.lock().unwrap(); settings.auth_token = value.get().expect("Auth token should be a string"); } + "stun-server" => { + let mut settings = self.settings.lock().unwrap(); + settings.stun_server = value + .get::>() + .expect("type checked upstream"); + self.webrtcbin + .set_property("stun-server", settings.stun_server.as_ref()); + } + "turn-server" => { + let mut settings = self.settings.lock().unwrap(); + settings.turn_server = value + .get::>() + .expect("type checked upstream"); + self.webrtcbin + .set_property("turn-server", settings.turn_server.as_ref()); + } + "ice-transport-policy" => { + let mut settings = self.settings.lock().unwrap(); + settings.ice_transport_policy = value + .get::() + .expect("ice-transport-policy should be an enum value"); + + if settings.ice_transport_policy == GstRsWebRTCICETransportPolicy::Relay { + self.webrtcbin + .set_property_from_str("ice-transport-policy", "relay"); + } else { + self.webrtcbin + .set_property_from_str("ice-transport-policy", "all"); + } + } _ => unimplemented!(), } } @@ -246,6 +301,18 @@ impl ObjectImpl for WhipSink { let settings = self.settings.lock().unwrap(); settings.auth_token.to_value() } + "stun-server" => { + let settings = self.settings.lock().unwrap(); + settings.stun_server.to_value() + } + "turn-server" => { + let settings = self.settings.lock().unwrap(); + settings.turn_server.to_value() + } + "ice-transport-policy" => { + let settings = self.settings.lock().unwrap(); + settings.ice_transport_policy.to_value() + } _ => unimplemented!(), } } @@ -256,12 +323,34 @@ impl ObjectImpl for WhipSink { let obj = self.obj(); obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE); obj.set_element_flags(gst::ElementFlags::SINK); - obj.add(&self.webrtcbin).unwrap(); // The spec requires all m= lines to be bundled (section 4.2) self.webrtcbin .set_property("bundle-policy", gst_webrtc::WebRTCBundlePolicy::MaxBundle); + let self_weak = self.downgrade(); + self.webrtcbin + .connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| { + let self_ = match self_weak.upgrade() { + Some(self_) => self_, + None => return, + }; + + let state = webrtcbin.property::("ice-gathering-state"); + + match state { + WebRTCICEGatheringState::Gathering => { + gst::info!(CAT, imp: self_, "ICE gathering started") + } + WebRTCICEGatheringState::Complete => { + gst::info!(CAT, imp: self_, "ICE gathering completed"); + + self_.send_offer(); + } + _ => (), + } + }); + self.webrtcbin.connect("on-negotiation-needed", false, { move |args| { let webrtcbin = args[0].get::().unwrap(); @@ -318,6 +407,7 @@ impl ObjectImpl for WhipSink { Some(ele) => ele, None => return, }; + let whipsink = ele.imp(); let offer_sdp = match reply { @@ -343,13 +433,11 @@ impl ObjectImpl for WhipSink { return; } }; - if let Err(e) = whipsink.send_offer(offer_sdp) { - gst::element_error!( - ele, - gst::ResourceError::Failed, - ["Error in 'send_offer' - {}", e.to_string()] - ); - } + + whipsink.webrtcbin.emit_by_name::<()>( + "set-local-description", + &[&offer_sdp, &None::], + ); }); whipsink @@ -368,6 +456,8 @@ impl ObjectImpl for WhipSink { None } }); + + obj.add(&self.webrtcbin).unwrap(); } } @@ -377,6 +467,7 @@ impl ObjectSubclass for WhipSink { type Type = super::WhipSink; type ParentType = gst::Bin; } + impl WhipSink { fn lookup_ice_servers(&self, endpoint: reqwest::Url) -> Result<(), ErrorMessage> { let settings = self.settings.lock().unwrap(); @@ -464,28 +555,18 @@ impl WhipSink { } } - fn send_offer( - &self, - offer: gst_webrtc::WebRTCSessionDescription, - ) -> Result<(), gst::ErrorMessage> { + fn send_offer(&self) { let settings = self.settings.lock().unwrap(); - self.webrtcbin - .emit_by_name::<()>("set-local-description", &[&offer, &None::]); - - if settings.whip_endpoint.is_none() { - return Err(gst::error_msg!( - gst::ResourceError::NotFound, - ["Endpoint URL must be set"] - )); - } - + /* Note that we check for a valid WHIP endpoint in change_state */ let endpoint = reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()); if let Err(e) = endpoint { - return Err(gst::error_msg!( + element_imp_error!( + self, gst::ResourceError::Failed, ["Could not parse endpoint URL: {}", e] - )); + ); + return; } drop(settings); @@ -493,12 +574,44 @@ impl WhipSink { *state = State::Post { redirects: 0 }; drop(state); - let answer = self.do_post(offer, endpoint.unwrap())?; + let local_desc = self + .webrtcbin + .property::>("local-description"); - self.webrtcbin - .emit_by_name::<()>("set-remote-description", &[&answer, &None::]); + let offer_sdp = match local_desc { + None => { + element_imp_error!( + self, + gst::ResourceError::Failed, + ["Local description is not set"] + ); + return; + } + Some(offer) => offer, + }; - Ok(()) + gst::debug!( + CAT, + imp: self, + "Sending offer SDP: {:?}", + offer_sdp.sdp().as_text() + ); + + match self.do_post(offer_sdp, endpoint.unwrap()) { + Ok(answer) => { + self.webrtcbin.emit_by_name::<()>( + "set-remote-description", + &[&answer, &None::], + ); + } + Err(e) => { + element_imp_error!( + self, + gst::ResourceError::Failed, + ["Failed to send offer: {}", e] + ); + } + } } fn do_post(