webrtcsrc: Add a 'connect-to-first-producer' property

This is an helper property which allows to avoid requiring to know
peer IDs, which is very useful during development.

Fixes: https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/issues/386
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1996>
This commit is contained in:
Thibault Saunier 2024-12-18 16:28:58 -03:00 committed by GStreamer Marge Bot
parent 026a2a11eb
commit 1e3eef253b
3 changed files with 117 additions and 5 deletions

View file

@ -12301,6 +12301,20 @@
"type": "GstWebRTCSrcPad" "type": "GstWebRTCSrcPad"
} }
}, },
"properties": {
"connect-to-first-producer": {
"blurb": "When enabled, automatically connect to the first peer that becomes available if no 'peer-id' is specified.",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "false",
"mutable": "ready",
"readable": true,
"type": "gboolean",
"writable": true
}
},
"rank": "primary" "rank": "primary"
}, },
"whipclientsink": { "whipclientsink": {

View file

@ -43,6 +43,7 @@ pub struct Settings {
role: WebRTCSignallerRole, role: WebRTCSignallerRole,
headers: Option<gst::Structure>, headers: Option<gst::Structure>,
insecure_tls: bool, insecure_tls: bool,
connect_to_first_producer: bool,
} }
impl Default for Settings { impl Default for Settings {
@ -54,6 +55,7 @@ impl Default for Settings {
role: Default::default(), role: Default::default(),
headers: None, headers: None,
insecure_tls: DEFAULT_INSECURE_TLS, insecure_tls: DEFAULT_INSECURE_TLS,
connect_to_first_producer: false,
} }
} }
} }
@ -80,6 +82,14 @@ impl Signaller {
self.settings.lock().unwrap().uri.clone() self.settings.lock().unwrap().uri.clone()
} }
pub fn connect_to_first_producer(&self) -> bool {
self.settings.lock().unwrap().connect_to_first_producer
}
pub fn set_connect_to_first_producer(&self, value: bool) {
self.settings.lock().unwrap().connect_to_first_producer = value;
}
fn set_uri(&self, uri: &str) -> Result<(), Error> { fn set_uri(&self, uri: &str) -> Result<(), Error> {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let mut uri = Url::from_str(uri).map_err(|err| anyhow!("{err:?}"))?; let mut uri = Url::from_str(uri).map_err(|err| anyhow!("{err:?}"))?;
@ -100,31 +110,56 @@ impl Signaller {
} }
} }
if let Some(connect_to_first_producer) = uri
.query_pairs()
.find(|(k, _)| k == "connect-to-first-producer")
.map(|v| matches!(v.1.to_lowercase().as_str(), "true" | "1" | ""))
{
if !matches!(settings.role, WebRTCSignallerRole::Consumer) {
gst::warning!(
CAT,
"Setting connect-to-first-producer doesn't make sense for {:?}",
settings.role
);
} else {
settings.connect_to_first_producer = connect_to_first_producer;
}
}
if let Some(peer_id) = &settings.producer_peer_id { if let Some(peer_id) = &settings.producer_peer_id {
uri.query_pairs_mut() uri.query_pairs_mut()
.clear() .clear()
.append_pair("peer-id", peer_id); .append_pair("peer-id", peer_id);
} }
if settings.connect_to_first_producer {
uri.query_pairs_mut()
.clear()
.append_pair("connect-to-first-producer", "true");
}
settings.uri = uri; settings.uri = uri;
Ok(()) Ok(())
} }
async fn connect(&self) -> Result<(), Error> { async fn connect(&self) -> Result<(), Error> {
let (cafile, insecure_tls, role) = { let (cafile, insecure_tls, role, connect_to_first_producer) = {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
( (
settings.cafile.clone(), settings.cafile.clone(),
settings.insecure_tls, settings.insecure_tls,
settings.role, settings.role,
settings.connect_to_first_producer,
) )
}; };
if let super::WebRTCSignallerRole::Consumer = role { if let super::WebRTCSignallerRole::Consumer = role {
if !connect_to_first_producer {
self.producer_peer_id() self.producer_peer_id()
.ok_or_else(|| anyhow!("No target producer peer id set"))?; .ok_or_else(|| anyhow!("No target producer peer id set"))?;
} }
}
let mut connector_builder = tokio_native_tls::native_tls::TlsConnector::builder(); let mut connector_builder = tokio_native_tls::native_tls::TlsConnector::builder();
@ -338,8 +373,12 @@ impl Signaller {
match msg { match msg {
p::OutgoingMessage::Welcome { peer_id } => { p::OutgoingMessage::Welcome { peer_id } => {
self.set_status(meta, &peer_id); self.set_status(meta, &peer_id);
if self.producer_peer_id().is_none() {
self.send(p::IncomingMessage::List);
} else {
self.start_session(); self.start_session();
} }
}
p::OutgoingMessage::PeerStatusChanged(p::PeerStatus { p::OutgoingMessage::PeerStatusChanged(p::PeerStatus {
meta, meta,
roles, roles,
@ -472,6 +511,21 @@ impl Signaller {
} }
}, },
p::OutgoingMessage::List { producers } => { p::OutgoingMessage::List { producers } => {
let mut settings = self.settings.lock().unwrap();
let role = settings.role;
if matches!(role, super::WebRTCSignallerRole::Consumer)
&& settings.producer_peer_id.is_none()
{
if let Some(producer) = producers.first() {
settings.producer_peer_id = Some(producer.id.clone());
drop(settings);
self.start_session();
return ControlFlow::Continue(());
}
}
for producer in producers { for producer in producers {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
if !state.producers.contains(&producer.id) { if !state.producers.contains(&producer.id) {

View file

@ -1687,7 +1687,51 @@ impl Default for State {
#[derive(Default)] #[derive(Default)]
pub struct WebRTCSrc {} pub struct WebRTCSrc {}
impl ObjectImpl for WebRTCSrc {} impl ObjectImpl for WebRTCSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
vec![glib::ParamSpecBoolean::builder("connect-to-first-producer")
.nick("Connect to first peer")
.blurb(
"When enabled, automatically connect to the first peer that becomes available \
if no 'peer-id' is specified.",
)
.default_value(false)
.mutable_ready()
.build()]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let obj = self.obj();
let base = obj.upcast_ref::<super::BaseWebRTCSrc>().imp();
match pspec.name() {
"connect-to-first-producer" => base
.signaller()
.downcast::<Signaller>()
.unwrap()
.imp()
.set_connect_to_first_producer(value.get().unwrap()),
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let obj = self.obj();
let base = obj.upcast_ref::<super::BaseWebRTCSrc>().imp();
match pspec.name() {
"connect-to-first-producer" => base
.signaller()
.downcast::<Signaller>()
.unwrap()
.imp()
.connect_to_first_producer()
.into(),
_ => unimplemented!(),
}
}
}
impl GstObjectImpl for WebRTCSrc {} impl GstObjectImpl for WebRTCSrc {}