diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 9d409956..1f0869f3 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -12301,6 +12301,20 @@ "type": "GstWebRTCSrcPad" } }, + "properties": { + "connect-to-first-producer": { + "blurb": "When enabled, automatically connect to the first peer that becomes available if no 'peer-id' is specified.", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "ready", + "readable": true, + "type": "gboolean", + "writable": true + } + }, "rank": "primary" }, "whipclientsink": { diff --git a/net/webrtc/src/signaller/imp.rs b/net/webrtc/src/signaller/imp.rs index 2aae4a3a..541960eb 100644 --- a/net/webrtc/src/signaller/imp.rs +++ b/net/webrtc/src/signaller/imp.rs @@ -43,6 +43,7 @@ pub struct Settings { role: WebRTCSignallerRole, headers: Option, insecure_tls: bool, + connect_to_first_producer: bool, } impl Default for Settings { @@ -54,6 +55,7 @@ impl Default for Settings { role: Default::default(), headers: None, insecure_tls: DEFAULT_INSECURE_TLS, + connect_to_first_producer: false, } } } @@ -80,6 +82,14 @@ impl Signaller { self.settings.lock().unwrap().uri.clone() } + pub fn connect_to_first_producer(&self) -> bool { + self.settings.lock().unwrap().connect_to_first_producer + } + + pub fn set_connect_to_first_producer(&self, value: bool) { + self.settings.lock().unwrap().connect_to_first_producer = value; + } + fn set_uri(&self, uri: &str) -> Result<(), Error> { let mut settings = self.settings.lock().unwrap(); let mut uri = Url::from_str(uri).map_err(|err| anyhow!("{err:?}"))?; @@ -100,30 +110,55 @@ impl Signaller { } } + if let Some(connect_to_first_producer) = uri + .query_pairs() + .find(|(k, _)| k == "connect-to-first-producer") + .map(|v| matches!(v.1.to_lowercase().as_str(), "true" | "1" | "")) + { + if !matches!(settings.role, WebRTCSignallerRole::Consumer) { + gst::warning!( + CAT, + "Setting connect-to-first-producer doesn't make sense for {:?}", + settings.role + ); + } else { + settings.connect_to_first_producer = connect_to_first_producer; + } + } + if let Some(peer_id) = &settings.producer_peer_id { uri.query_pairs_mut() .clear() .append_pair("peer-id", peer_id); } + if settings.connect_to_first_producer { + uri.query_pairs_mut() + .clear() + .append_pair("connect-to-first-producer", "true"); + } + settings.uri = uri; Ok(()) } async fn connect(&self) -> Result<(), Error> { - let (cafile, insecure_tls, role) = { + let (cafile, insecure_tls, role, connect_to_first_producer) = { let settings = self.settings.lock().unwrap(); ( settings.cafile.clone(), settings.insecure_tls, settings.role, + settings.connect_to_first_producer, ) }; if let super::WebRTCSignallerRole::Consumer = role { - self.producer_peer_id() - .ok_or_else(|| anyhow!("No target producer peer id set"))?; + if !connect_to_first_producer { + self.producer_peer_id() + .ok_or_else(|| anyhow!("No target producer peer id set"))?; + } } let mut connector_builder = tokio_native_tls::native_tls::TlsConnector::builder(); @@ -338,7 +373,11 @@ impl Signaller { match msg { p::OutgoingMessage::Welcome { peer_id } => { self.set_status(meta, &peer_id); - self.start_session(); + if self.producer_peer_id().is_none() { + self.send(p::IncomingMessage::List); + } else { + self.start_session(); + } } p::OutgoingMessage::PeerStatusChanged(p::PeerStatus { meta, @@ -472,6 +511,21 @@ impl Signaller { } }, p::OutgoingMessage::List { producers } => { + let mut settings = self.settings.lock().unwrap(); + let role = settings.role; + if matches!(role, super::WebRTCSignallerRole::Consumer) + && settings.producer_peer_id.is_none() + { + if let Some(producer) = producers.first() { + settings.producer_peer_id = Some(producer.id.clone()); + + drop(settings); + + self.start_session(); + + return ControlFlow::Continue(()); + } + } for producer in producers { let mut state = self.state.lock().unwrap(); if !state.producers.contains(&producer.id) { diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index be4fefdc..c60a630e 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -1687,7 +1687,51 @@ impl Default for State { #[derive(Default)] pub struct WebRTCSrc {} -impl ObjectImpl for WebRTCSrc {} +impl ObjectImpl for WebRTCSrc { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: LazyLock> = LazyLock::new(|| { + vec![glib::ParamSpecBoolean::builder("connect-to-first-producer") + .nick("Connect to first peer") + .blurb( + "When enabled, automatically connect to the first peer that becomes available \ + if no 'peer-id' is specified.", + ) + .default_value(false) + .mutable_ready() + .build()] + }); + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let obj = self.obj(); + let base = obj.upcast_ref::().imp(); + match pspec.name() { + "connect-to-first-producer" => base + .signaller() + .downcast::() + .unwrap() + .imp() + .set_connect_to_first_producer(value.get().unwrap()), + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let obj = self.obj(); + let base = obj.upcast_ref::().imp(); + match pspec.name() { + "connect-to-first-producer" => base + .signaller() + .downcast::() + .unwrap() + .imp() + .connect_to_first_producer() + .into(), + _ => unimplemented!(), + } + } +} impl GstObjectImpl for WebRTCSrc {}