net/webrtc/signaller: advertise running producers in Listener mode

When starting a webrtcsrc-signaller client in Listener mode, only the producers
started after the client connection were advertised. All currently
running producers were ignored unlike the gstwebrtc-api behavior. This
commit now lists all running producers when the client Listener connects
and advertises them through the "producer-added" signal.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1296>
This commit is contained in:
Loïc Le Page 2023-08-08 13:14:03 +02:00
parent d688aeb184
commit 7af2ff0843
3 changed files with 48 additions and 10 deletions

View file

@ -6699,6 +6699,10 @@
{ {
"name": "arg1", "name": "arg1",
"type": "GstStructure" "type": "GstStructure"
},
{
"name": "arg2",
"type": "gboolean"
} }
], ],
"return-type": "void", "return-type": "void",

View file

@ -68,21 +68,29 @@ unsafe impl prelude::ObjectInterface for Signallable {
/** /**
* GstRSWebRTCSignallableIface::producer-added: * GstRSWebRTCSignallableIface::producer-added:
* @self: The object implementing #GstRSWebRTCSignallableIface * @self: The object implementing #GstRSWebRTCSignallableIface
* @producer_id: The ID of the producer that was added * @producer_id: The ID of the available producer
* @meta: The metadata structure of the producer * @meta: The metadata structure of the producer
* @new_connection: true if the producer has just connected to
* the signalling server or false if it was
* already running before starting the client
* *
* Some new producing peer is ready to produce a WebRTC stream. * Some producing peer is available to produce a WebRTC stream.
*/ */
Signal::builder("producer-added") Signal::builder("producer-added")
.param_types([str::static_type(), <Option<gst::Structure>>::static_type()]) .param_types([
str::static_type(),
<Option<gst::Structure>>::static_type(),
bool::static_type(),
])
.build(), .build(),
/** /**
* GstRSWebRTCSignallableIface::producer-removed: * GstRSWebRTCSignallableIface::producer-removed:
* @self: The object implementing #GstRSWebRTCSignallableIface * @self: The object implementing #GstRSWebRTCSignallableIface
* @producer_id: The ID of the producer that was added * @producer_id: The ID of the producer that is not available
* anymore
* @meta: The metadata structure of the producer * @meta: The metadata structure of the producer
* *
* Some new producing peer is stopped producing streams. * Some producing peer has stopped producing streams.
*/ */
Signal::builder("producer-removed") Signal::builder("producer-removed")
.param_types([str::static_type(), <Option<gst::Structure>>::static_type()]) .param_types([str::static_type(), <Option<gst::Structure>>::static_type()])

View file

@ -210,6 +210,10 @@ impl Signaller {
roles: vec![p::PeerRole::Listener], roles: vec![p::PeerRole::Listener],
}, },
})); }));
if matches!(role, super::WebRTCSignallerRole::Listener) {
self.send(p::IncomingMessage::List);
}
} }
fn producer_peer_id(&self) -> Option<String> { fn producer_peer_id(&self) -> Option<String> {
@ -282,8 +286,10 @@ impl Signaller {
state.producers.insert(peer_id.clone()); state.producers.insert(peer_id.clone());
drop(state); drop(state);
self.obj() self.obj().emit_by_name::<()>(
.emit_by_name::<()>("producer-added", &[&peer_id, &meta]); "producer-added",
&[&peer_id, &meta, &true],
);
} }
} else if state.producers.remove(&peer_id) { } else if state.producers.remove(&peer_id) {
drop(state); drop(state);
@ -366,15 +372,34 @@ impl Signaller {
); );
} }
}, },
p::OutgoingMessage::List { producers } => {
for producer in producers {
let mut state = self.state.lock().unwrap();
if !state.producers.contains(&producer.id) {
state.producers.insert(producer.id.clone());
drop(state);
let meta = producer.meta.and_then(|m| match m {
serde_json::Value::Object(v) => Some(serialize_json_object(&v)),
_ => {
gst::error!(CAT, imp: self, "Invalid json value: {m:?}");
None
}
});
self.obj().emit_by_name::<()>(
"producer-added",
&[&producer.id, &meta, &false],
);
}
}
}
p::OutgoingMessage::Error { details } => { p::OutgoingMessage::Error { details } => {
self.obj().emit_by_name::<()>( self.obj().emit_by_name::<()>(
"error", "error",
&[&format!("Error message from server: {details}")], &[&format!("Error message from server: {details}")],
); );
} }
_ => {
gst::warning!(CAT, imp: self, "Ignoring unsupported message {:?}", msg);
}
} }
} else { } else {
gst::error!(CAT, imp: self, "Unknown message from server: {}", msg); gst::error!(CAT, imp: self, "Unknown message from server: {}", msg);
@ -521,6 +546,7 @@ impl SignallableImpl for Signaller {
} }
}); });
} }
state.producers.clear();
} }
fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) { fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) {