diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index c628fc7c..fd6257ce 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -7923,6 +7923,56 @@ "type": "gchararray", "writable": true }, + "ice-transport-policy": { + "blurb": "The policy to apply for ICE transport", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "all (0)", + "mutable": "null", + "readable": true, + "type": "GstRsWebRTCICETransportPolicy", + "writable": true + }, + "stun-server": { + "blurb": "The STUN server of the form stun://hostname:port", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, + "timeout": { + "blurb": "Value in seconds to timeout WHIP endpoint requests (0 = No timeout).", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "15", + "max": "3600", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint", + "writable": true + }, + "turn-server": { + "blurb": "The TURN server of the form turn(s)://username:password@host:port.", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "NULL", + "mutable": "null", + "readable": true, + "type": "gchararray", + "writable": true + }, "use-link-headers": { "blurb": "Use link headers to configure ice-servers from the WHIP server response to the POST or OPTIONS request.\n If set to TRUE and the WHIP server returns valid ice-servers,\n this property overrides the ice-servers values set using the stun-server and turn-server properties.", "conditionally-available": false, diff --git a/net/webrtchttp/src/whipsink/imp.rs b/net/webrtchttp/src/whipsink/imp.rs index 4677824b..ee712a3c 100644 --- a/net/webrtchttp/src/whipsink/imp.rs +++ b/net/webrtchttp/src/whipsink/imp.rs @@ -8,7 +8,9 @@ // SPDX-License-Identifier: MPL-2.0 use crate::utils::{build_reqwest_client, parse_redirect_location, set_ice_servers, wait}; +use crate::GstRsWebRTCICETransportPolicy; use futures::future; +use gst::element_imp_error; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; @@ -25,6 +27,8 @@ static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new("whipsink", gst::DebugColorFlags::empty(), Some("WHIP Sink")) }); +const DEFAULT_ICE_TRANSPORT_POLICY: GstRsWebRTCICETransportPolicy = + GstRsWebRTCICETransportPolicy::All; const MAX_REDIRECTS: u8 = 10; #[derive(Debug, Clone)] @@ -32,6 +36,9 @@ struct Settings { whip_endpoint: Option, use_link_headers: bool, auth_token: Option, + turn_server: Option, + stun_server: Option, + ice_transport_policy: GstRsWebRTCICETransportPolicy, } #[allow(clippy::derivable_impls)] @@ -41,6 +48,9 @@ impl Default for Settings { whip_endpoint: None, use_link_headers: false, auth_token: None, + stun_server: None, + turn_server: None, + ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY, } } } @@ -206,7 +216,22 @@ impl ObjectImpl for WhipSink { .nick("Authorization Token") .blurb("Authentication token to use, will be sent in the HTTP Header as 'Bearer '") .mutable_ready() - .build() + .build(), + + glib::ParamSpecString::builder("stun-server") + .nick("STUN Server") + .blurb("The STUN server of the form stun://hostname:port") + .build(), + + glib::ParamSpecString::builder("turn-server") + .nick("TURN Server") + .blurb("The TURN server of the form turn(s)://username:password@host:port.") + .build(), + + glib::ParamSpecEnum::builder::("ice-transport-policy", DEFAULT_ICE_TRANSPORT_POLICY) + .nick("ICE transport policy") + .blurb("The policy to apply for ICE transport") + .build(), ] }); PROPERTIES.as_ref() @@ -228,6 +253,36 @@ impl ObjectImpl for WhipSink { let mut settings = self.settings.lock().unwrap(); settings.auth_token = value.get().expect("Auth token should be a string"); } + "stun-server" => { + let mut settings = self.settings.lock().unwrap(); + settings.stun_server = value + .get::>() + .expect("type checked upstream"); + self.webrtcbin + .set_property("stun-server", settings.stun_server.as_ref()); + } + "turn-server" => { + let mut settings = self.settings.lock().unwrap(); + settings.turn_server = value + .get::>() + .expect("type checked upstream"); + self.webrtcbin + .set_property("turn-server", settings.turn_server.as_ref()); + } + "ice-transport-policy" => { + let mut settings = self.settings.lock().unwrap(); + settings.ice_transport_policy = value + .get::() + .expect("ice-transport-policy should be an enum value"); + + if settings.ice_transport_policy == GstRsWebRTCICETransportPolicy::Relay { + self.webrtcbin + .set_property_from_str("ice-transport-policy", "relay"); + } else { + self.webrtcbin + .set_property_from_str("ice-transport-policy", "all"); + } + } _ => unimplemented!(), } } @@ -246,6 +301,18 @@ impl ObjectImpl for WhipSink { let settings = self.settings.lock().unwrap(); settings.auth_token.to_value() } + "stun-server" => { + let settings = self.settings.lock().unwrap(); + settings.stun_server.to_value() + } + "turn-server" => { + let settings = self.settings.lock().unwrap(); + settings.turn_server.to_value() + } + "ice-transport-policy" => { + let settings = self.settings.lock().unwrap(); + settings.ice_transport_policy.to_value() + } _ => unimplemented!(), } } @@ -256,12 +323,34 @@ impl ObjectImpl for WhipSink { let obj = self.obj(); obj.set_suppressed_flags(gst::ElementFlags::SINK | gst::ElementFlags::SOURCE); obj.set_element_flags(gst::ElementFlags::SINK); - obj.add(&self.webrtcbin).unwrap(); // The spec requires all m= lines to be bundled (section 4.2) self.webrtcbin .set_property("bundle-policy", gst_webrtc::WebRTCBundlePolicy::MaxBundle); + let self_weak = self.downgrade(); + self.webrtcbin + .connect_notify(Some("ice-gathering-state"), move |webrtcbin, _pspec| { + let self_ = match self_weak.upgrade() { + Some(self_) => self_, + None => return, + }; + + let state = webrtcbin.property::("ice-gathering-state"); + + match state { + WebRTCICEGatheringState::Gathering => { + gst::info!(CAT, imp: self_, "ICE gathering started") + } + WebRTCICEGatheringState::Complete => { + gst::info!(CAT, imp: self_, "ICE gathering completed"); + + self_.send_offer(); + } + _ => (), + } + }); + self.webrtcbin.connect("on-negotiation-needed", false, { move |args| { let webrtcbin = args[0].get::().unwrap(); @@ -318,6 +407,7 @@ impl ObjectImpl for WhipSink { Some(ele) => ele, None => return, }; + let whipsink = ele.imp(); let offer_sdp = match reply { @@ -343,13 +433,11 @@ impl ObjectImpl for WhipSink { return; } }; - if let Err(e) = whipsink.send_offer(offer_sdp) { - gst::element_error!( - ele, - gst::ResourceError::Failed, - ["Error in 'send_offer' - {}", e.to_string()] - ); - } + + whipsink.webrtcbin.emit_by_name::<()>( + "set-local-description", + &[&offer_sdp, &None::], + ); }); whipsink @@ -368,6 +456,8 @@ impl ObjectImpl for WhipSink { None } }); + + obj.add(&self.webrtcbin).unwrap(); } } @@ -377,6 +467,7 @@ impl ObjectSubclass for WhipSink { type Type = super::WhipSink; type ParentType = gst::Bin; } + impl WhipSink { fn lookup_ice_servers(&self, endpoint: reqwest::Url) -> Result<(), ErrorMessage> { let settings = self.settings.lock().unwrap(); @@ -464,28 +555,18 @@ impl WhipSink { } } - fn send_offer( - &self, - offer: gst_webrtc::WebRTCSessionDescription, - ) -> Result<(), gst::ErrorMessage> { + fn send_offer(&self) { let settings = self.settings.lock().unwrap(); - self.webrtcbin - .emit_by_name::<()>("set-local-description", &[&offer, &None::]); - - if settings.whip_endpoint.is_none() { - return Err(gst::error_msg!( - gst::ResourceError::NotFound, - ["Endpoint URL must be set"] - )); - } - + /* Note that we check for a valid WHIP endpoint in change_state */ let endpoint = reqwest::Url::parse(settings.whip_endpoint.as_ref().unwrap().as_str()); if let Err(e) = endpoint { - return Err(gst::error_msg!( + element_imp_error!( + self, gst::ResourceError::Failed, ["Could not parse endpoint URL: {}", e] - )); + ); + return; } drop(settings); @@ -493,12 +574,44 @@ impl WhipSink { *state = State::Post { redirects: 0 }; drop(state); - let answer = self.do_post(offer, endpoint.unwrap())?; + let local_desc = self + .webrtcbin + .property::>("local-description"); - self.webrtcbin - .emit_by_name::<()>("set-remote-description", &[&answer, &None::]); + let offer_sdp = match local_desc { + None => { + element_imp_error!( + self, + gst::ResourceError::Failed, + ["Local description is not set"] + ); + return; + } + Some(offer) => offer, + }; - Ok(()) + gst::debug!( + CAT, + imp: self, + "Sending offer SDP: {:?}", + offer_sdp.sdp().as_text() + ); + + match self.do_post(offer_sdp, endpoint.unwrap()) { + Ok(answer) => { + self.webrtcbin.emit_by_name::<()>( + "set-remote-description", + &[&answer, &None::], + ); + } + Err(e) => { + element_imp_error!( + self, + gst::ResourceError::Failed, + ["Failed to send offer: {}", e] + ); + } + } } fn do_post(