From 2ac560975cf0f8ad53e9742206b6e6b0dcfa1ca2 Mon Sep 17 00:00:00 2001 From: Matthew Waters Date: Tue, 21 Mar 2023 16:16:16 +1100 Subject: [PATCH] webrtc/signaller: emit the relevant signals instead of the interface vtable In order to support the use case of an external user providing their own signalling mechanism, we want the signals to be used and only if nothing is connected, fallback to the default handling. Calling the interface vtable directly will bypass the signal emission entirely. Also ensure that the signals are defined properly for this case. i.e. 1. Signals the the application/external code is expected to emit are marked as an action signal. 2. Add accumulators to avoid calling the default class handler if another signal handler is connected. Part-of: --- net/webrtc/src/aws_kvs_signaller/imp.rs | 17 +- net/webrtc/src/signaller/iface.rs | 199 +++++++++++++++++++----- net/webrtc/src/signaller/imp.rs | 6 +- net/webrtc/src/webrtcsink/imp.rs | 2 +- net/webrtc/src/webrtcsrc/imp.rs | 7 +- 5 files changed, 175 insertions(+), 56 deletions(-) diff --git a/net/webrtc/src/aws_kvs_signaller/imp.rs b/net/webrtc/src/aws_kvs_signaller/imp.rs index 54f84d0d..b982c74c 100644 --- a/net/webrtc/src/aws_kvs_signaller/imp.rs +++ b/net/webrtc/src/aws_kvs_signaller/imp.rs @@ -100,19 +100,16 @@ impl Signaller { ); self.obj().emit_by_name::<()>( "session-requested", - &[&msg.sender_client_id, &msg.sender_client_id], - ); - self.obj().emit_by_name::<()>( - "session-description", &[ &msg.sender_client_id, - &gst_webrtc::WebRTCSessionDescription::new( + &msg.sender_client_id, + &Some(gst_webrtc::WebRTCSessionDescription::new( gst_webrtc::WebRTCSDPType::Offer, gst_sdp::SDPMessage::parse_buffer( sdp_msg.sdp.as_bytes(), ) .unwrap(), - ), + )), ], ); } else { @@ -355,7 +352,7 @@ impl Signaller { self.obj().connect_closure( "consumer-added", false, - glib::closure!(|_webrtcsink: &gst::Element, + glib::closure!(|_signaller: &super::AwsKvsSignaller, _consumer_identifier: &str, webrtcbin: &gst::Element| { webrtcbin.set_property( @@ -560,7 +557,7 @@ impl SignallableImpl for Signaller { &self, session_id: &str, candidate: &str, - sdp_m_line_index: Option, + sdp_m_line_index: u32, _sdp_mid: Option, ) { let state = self.state.lock().unwrap(); @@ -570,8 +567,8 @@ impl SignallableImpl for Signaller { message_payload: BASE64.encode( &serde_json::to_string(&p::OutgoingIceCandidate { candidate: candidate.to_string(), - sdp_mid: sdp_m_line_index.unwrap().to_string(), - sdp_m_line_index: sdp_m_line_index.unwrap(), + sdp_mid: sdp_m_line_index.to_string(), + sdp_m_line_index, }) .unwrap() .into_bytes(), diff --git a/net/webrtc/src/signaller/iface.rs b/net/webrtc/src/signaller/iface.rs index d5b73631..fc9ea6de 100644 --- a/net/webrtc/src/signaller/iface.rs +++ b/net/webrtc/src/signaller/iface.rs @@ -10,7 +10,7 @@ pub struct Signallable { pub start: fn(&super::Signallable), pub stop: fn(&super::Signallable), pub send_sdp: fn(&super::Signallable, &str, &gst_webrtc::WebRTCSessionDescription), - pub add_ice: fn(&super::Signallable, &str, &str, Option, Option), + pub add_ice: fn(&super::Signallable, &str, &str, u32, Option), pub end_session: fn(&super::Signallable, &str), } @@ -30,7 +30,7 @@ impl Signallable { _iface: &super::Signallable, _session_id: &str, _candidate: &str, - _sdp_m_line_index: Option, + _sdp_m_line_index: u32, _sdp_mid: Option, ) { } @@ -58,10 +58,33 @@ unsafe impl prelude::ObjectInterface for Signallable { * @self: The object implementing #GstRSWebRTCSignallableIface * @session-id: The ID of the session that ended * - * Some WebRTC Session was closed. + * Notify the underlying webrtc object that a session has ended. */ Signal::builder("session-ended") + .flags(glib::SignalFlags::ACTION) .param_types([str::static_type()]) + // in order to have an accumulator actually do something, we need to have a + // return value (glib limitation). Use a dummy bool for this purpose. + .return_type::() + .class_handler(|_token, args| { + let arg0 = args[0usize] + .get::<&super::Signallable>() + .unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 0usize, e) + }); + let arg1 = args[1usize].get::<&str>().unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 1usize, e) + }); + let vtable = arg0.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.end_session)(arg0, arg1); + + Some(false.into()) + }) + .accumulator(move |_hint, output, input| { + *output = input.clone(); + false + }) .build(), /** * GstRSWebRTCSignallableIface::producer-added: @@ -89,7 +112,7 @@ unsafe impl prelude::ObjectInterface for Signallable { * GstRSWebRTCSignallableIface::session-started: * @self: The object implementing #GstRSWebRTCSignallableIface * - * A new session started, + * Notify the underlying webrtc object that a session has started. */ Signal::builder("session-started") .param_types([str::static_type(), str::static_type()]) @@ -100,6 +123,9 @@ unsafe impl prelude::ObjectInterface for Signallable { * @session_id: The ID of the producer that was added * @peer_id: The ID of the consumer peer who wants to initiate a * session + * + * Notify the underlying webrtc object that a session has been requested from the + * peer. */ Signal::builder("session-requested") .param_types([ @@ -134,6 +160,10 @@ unsafe impl prelude::ObjectInterface for Signallable { }); Some(Signallable::request_meta(arg0).to_value()) }) + .accumulator(move |_hint, output, input| { + *output = input.clone(); + false + }) .build(), /** * GstRSWebRTCSignallableIface::handle-ice: @@ -142,8 +172,11 @@ unsafe impl prelude::ObjectInterface for Signallable { * @sdp_m_line_index: The mlineindex of the ice candidate * @sdp_mid: Media ID of the ice candidate * @candiate: Information about the candidate + * + * Notify the underlying webrtc object of an ICE candidate. */ Signal::builder("handle-ice") + .flags(glib::SignalFlags::ACTION) .param_types([ str::static_type(), u32::static_type(), @@ -156,8 +189,11 @@ unsafe impl prelude::ObjectInterface for Signallable { * @self: The object implementing #GstRSWebRTCSignallableIface * @session_id: Id of the session being described * @description: The WebRTC session description + * + * Notify the underlying webrtc object of a received session description */ Signal::builder("session-description") + .flags(glib::SignalFlags::ACTION) .param_types([ str::static_type(), gst_webrtc::WebRTCSessionDescription::static_type(), @@ -170,16 +206,23 @@ unsafe impl prelude::ObjectInterface for Signallable { * Starts the signaller, connecting it to the signalling server. */ Signal::builder("start") - .flags(glib::SignalFlags::ACTION) + .run_last() + .return_type::() .class_handler(|_token, args| { let arg0 = args[0usize] .get::<&super::Signallable>() .unwrap_or_else(|e| { panic!("Wrong type for argument {}: {:?}", 0usize, e) }); - Signallable::start(arg0); + let vtable = arg0.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.start)(arg0); - None + Some(false.into()) + }) + .accumulator(move |_hint, output, input| { + *output = input.clone(); + false }) .build(), /** @@ -189,23 +232,32 @@ unsafe impl prelude::ObjectInterface for Signallable { * Stops the signaller, disconnecting it to the signalling server. */ Signal::builder("stop") - .flags(glib::SignalFlags::ACTION) + .run_last() + .return_type::() .class_handler(|_tokens, args| { let arg0 = args[0usize] .get::<&super::Signallable>() .unwrap_or_else(|e| { panic!("Wrong type for argument {}: {:?}", 0usize, e) }); - Signallable::stop(arg0); + let vtable = arg0.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.stop)(arg0); - None + Some(false.into()) + }) + .accumulator(move |_hint, output, input| { + *output = input.clone(); + false }) .build(), /** * GstRSWebRTCSignallableIface::shutdown: * @self: The object implementing #GstRSWebRTCSignallableIface */ - Signal::builder("shutdown").build(), + Signal::builder("shutdown") + .flags(glib::SignalFlags::ACTION) + .build(), /** * GstRSWebRTCSignallableIface::consumer-added: * @self: The object implementing #GstRSWebRTCSignallableIface @@ -226,9 +278,96 @@ unsafe impl prelude::ObjectInterface for Signallable { * This signal is emitted right after the connection with a consumer * has been dropped. */ - glib::subclass::Signal::builder("consumer-removed") + Signal::builder("consumer-removed") .param_types([String::static_type(), gst::Element::static_type()]) .build(), + /** + * GstRSWebRTCSignallableIface::send-session-description: + * @self: The object implementing #GstRSWebRTCSignallableIface + * @session_id: Id of the session being described + * @description: The WebRTC session description to send to the peer + * + * Send @description to the peer. + */ + Signal::builder("send-session-description") + .run_last() + .param_types([ + str::static_type(), + gst_webrtc::WebRTCSessionDescription::static_type(), + ]) + .return_type::() + .class_handler(|_tokens, args| { + let arg0 = args[0usize] + .get::<&super::Signallable>() + .unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 0usize, e) + }); + let arg1 = args[1usize].get::<&str>().unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 1usize, e) + }); + let arg2 = args[2usize] + .get::<&gst_webrtc::WebRTCSessionDescription>() + .unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 2usize, e) + }); + let vtable = arg0.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.send_sdp)(arg0, arg1, arg2); + + Some(false.into()) + }) + .accumulator(move |_hint, output, input| { + *output = input.clone(); + false + }) + .build(), + /** + * GstRSWebRTCSignallableIface::send-ice: + * @self: The object implementing #GstRSWebRTCSignallableIface + * @session_id: Id of the session being described + * @candidate: The ICE candidate description to send to the peer + * @sdp_m_line_index: The M-line of the session description this candidate applies to + * @sdp_mid: The MID this candidate applies to + * + * Send @candidate to the peer. + */ + Signal::builder("send-ice") + .param_types([ + str::static_type(), + str::static_type(), + u32::static_type(), + String::static_type(), + ]) + .return_type::() + .class_handler(|_tokens, args| { + let arg0 = args[0usize] + .get::<&super::Signallable>() + .unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 0usize, e) + }); + let arg1 = args[1usize].get::<&str>().unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 1usize, e) + }); + let arg2 = args[2usize].get::<&str>().unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 2usize, e) + }); + let arg3 = args[3usize].get::().unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 2usize, e) + }); + let arg4 = args[4usize].get::>().unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 2usize, e) + }); + let vtable = arg0.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.add_ice)(arg0, arg1, arg2, arg3, arg4); + + Some(false.into()) + }) + .accumulator(move |_hint, output, input| { + *output = input.clone(); + false + }) + .build(), ] }); SIGNALS.as_ref() @@ -279,7 +418,7 @@ where this: &super::Signallable, session_id: &str, candidate: &str, - sdp_m_line_index: Option, + sdp_m_line_index: u32, sdp_mid: Option, ) { let this = this @@ -310,7 +449,7 @@ pub trait SignallableImpl: object::ObjectImpl + 'static { &self, _session_id: &str, _candidate: &str, - _sdp_m_line_index: Option, + _sdp_m_line_index: u32, _sdp_mid: Option, ) { } @@ -325,7 +464,7 @@ pub trait SignallableExt: 'static { &self, session_id: &str, candidate: &str, - sdp_m_line_index: Option, + sdp_m_line_index: u32, sdp_mid: Option, ); fn end_session(&self, session_id: &str); @@ -333,43 +472,31 @@ pub trait SignallableExt: 'static { impl> SignallableExt for Obj { fn start(&self) { - let obj = self.upcast_ref::(); - let vtable = obj.interface::().unwrap(); - let vtable = vtable.as_ref(); - (vtable.start)(obj) + self.emit_by_name::("start", &[]); } fn stop(&self) { - let obj = self.upcast_ref::(); - let vtable = obj.interface::().unwrap(); - let vtable = vtable.as_ref(); - (vtable.stop)(obj) + self.emit_by_name::("stop", &[]); } fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) { - let obj = self.upcast_ref::(); - let vtable = obj.interface::().unwrap(); - let vtable = vtable.as_ref(); - (vtable.send_sdp)(obj, session_id, sdp) + self.emit_by_name::("send-session-description", &[&session_id, sdp]); } fn add_ice( &self, session_id: &str, candidate: &str, - sdp_m_line_index: Option, + sdp_m_line_index: u32, sdp_mid: Option, ) { - let obj = self.upcast_ref::(); - let vtable = obj.interface::().unwrap(); - let vtable = vtable.as_ref(); - (vtable.add_ice)(obj, session_id, candidate, sdp_m_line_index, sdp_mid) + self.emit_by_name::( + "send-ice", + &[&session_id, &candidate, &sdp_m_line_index, &sdp_mid], + ); } fn end_session(&self, session_id: &str) { - let obj = self.upcast_ref::(); - let vtable = obj.interface::().unwrap(); - let vtable = vtable.as_ref(); - (vtable.end_session)(obj, session_id) + self.emit_by_name::("session-ended", &[&session_id]); } } diff --git a/net/webrtc/src/signaller/imp.rs b/net/webrtc/src/signaller/imp.rs index 650fb335..f637fda4 100644 --- a/net/webrtc/src/signaller/imp.rs +++ b/net/webrtc/src/signaller/imp.rs @@ -321,7 +321,7 @@ impl Signaller { gst::info!(CAT, imp: self, "Session {session_id} ended"); self.obj() - .emit_by_name::<()>("session-ended", &[&session_id]); + .emit_by_name::("session-ended", &[&session_id]); } p::OutgoingMessage::Peer(p::PeerMessage { session_id, @@ -549,7 +549,7 @@ impl SignallableImpl for Signaller { &self, session_id: &str, candidate: &str, - sdp_m_line_index: Option, + sdp_m_line_index: u32, _sdp_mid: Option, ) { gst::debug!( @@ -562,7 +562,7 @@ impl SignallableImpl for Signaller { session_id: session_id.to_string(), peer_message: p::PeerMessageInner::Ice { candidate: candidate.to_string(), - sdp_m_line_index: sdp_m_line_index.unwrap(), + sdp_m_line_index, }, }); diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index ddd5ee4c..88ffcc34 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -1890,7 +1890,7 @@ impl WebRTCSink { let settings = self.settings.lock().unwrap(); settings .signaller - .add_ice(&session_id, &candidate, Some(sdp_m_line_index), None) + .add_ice(&session_id, &candidate, sdp_m_line_index, None) } /// Called by the signaller to add a new session diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index 7821dd38..2a1e56f0 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -865,12 +865,7 @@ impl WebRTCSrc { return; } }; - signaller.add_ice( - &session_id, - &candidate, - Some(sdp_m_line_index), - None::, - ); + signaller.add_ice(&session_id, &candidate, sdp_m_line_index, None::); } /// Called by the signaller with an ice candidate