diff --git a/net/webrtc/src/aws_kvs_signaller/imp.rs b/net/webrtc/src/aws_kvs_signaller/imp.rs index 54f84d0d..b982c74c 100644 --- a/net/webrtc/src/aws_kvs_signaller/imp.rs +++ b/net/webrtc/src/aws_kvs_signaller/imp.rs @@ -100,19 +100,16 @@ impl Signaller { ); self.obj().emit_by_name::<()>( "session-requested", - &[&msg.sender_client_id, &msg.sender_client_id], - ); - self.obj().emit_by_name::<()>( - "session-description", &[ &msg.sender_client_id, - &gst_webrtc::WebRTCSessionDescription::new( + &msg.sender_client_id, + &Some(gst_webrtc::WebRTCSessionDescription::new( gst_webrtc::WebRTCSDPType::Offer, gst_sdp::SDPMessage::parse_buffer( sdp_msg.sdp.as_bytes(), ) .unwrap(), - ), + )), ], ); } else { @@ -355,7 +352,7 @@ impl Signaller { self.obj().connect_closure( "consumer-added", false, - glib::closure!(|_webrtcsink: &gst::Element, + glib::closure!(|_signaller: &super::AwsKvsSignaller, _consumer_identifier: &str, webrtcbin: &gst::Element| { webrtcbin.set_property( @@ -560,7 +557,7 @@ impl SignallableImpl for Signaller { &self, session_id: &str, candidate: &str, - sdp_m_line_index: Option, + sdp_m_line_index: u32, _sdp_mid: Option, ) { let state = self.state.lock().unwrap(); @@ -570,8 +567,8 @@ impl SignallableImpl for Signaller { message_payload: BASE64.encode( &serde_json::to_string(&p::OutgoingIceCandidate { candidate: candidate.to_string(), - sdp_mid: sdp_m_line_index.unwrap().to_string(), - sdp_m_line_index: sdp_m_line_index.unwrap(), + sdp_mid: sdp_m_line_index.to_string(), + sdp_m_line_index, }) .unwrap() .into_bytes(), diff --git a/net/webrtc/src/signaller/iface.rs b/net/webrtc/src/signaller/iface.rs index d5b73631..fc9ea6de 100644 --- a/net/webrtc/src/signaller/iface.rs +++ b/net/webrtc/src/signaller/iface.rs @@ -10,7 +10,7 @@ pub struct Signallable { pub start: fn(&super::Signallable), pub stop: fn(&super::Signallable), pub send_sdp: fn(&super::Signallable, &str, &gst_webrtc::WebRTCSessionDescription), - pub add_ice: fn(&super::Signallable, &str, &str, Option, Option), + pub add_ice: fn(&super::Signallable, &str, &str, u32, Option), pub end_session: fn(&super::Signallable, &str), } @@ -30,7 +30,7 @@ impl Signallable { _iface: &super::Signallable, _session_id: &str, _candidate: &str, - _sdp_m_line_index: Option, + _sdp_m_line_index: u32, _sdp_mid: Option, ) { } @@ -58,10 +58,33 @@ unsafe impl prelude::ObjectInterface for Signallable { * @self: The object implementing #GstRSWebRTCSignallableIface * @session-id: The ID of the session that ended * - * Some WebRTC Session was closed. + * Notify the underlying webrtc object that a session has ended. */ Signal::builder("session-ended") + .flags(glib::SignalFlags::ACTION) .param_types([str::static_type()]) + // in order to have an accumulator actually do something, we need to have a + // return value (glib limitation). Use a dummy bool for this purpose. + .return_type::() + .class_handler(|_token, args| { + let arg0 = args[0usize] + .get::<&super::Signallable>() + .unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 0usize, e) + }); + let arg1 = args[1usize].get::<&str>().unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 1usize, e) + }); + let vtable = arg0.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.end_session)(arg0, arg1); + + Some(false.into()) + }) + .accumulator(move |_hint, output, input| { + *output = input.clone(); + false + }) .build(), /** * GstRSWebRTCSignallableIface::producer-added: @@ -89,7 +112,7 @@ unsafe impl prelude::ObjectInterface for Signallable { * GstRSWebRTCSignallableIface::session-started: * @self: The object implementing #GstRSWebRTCSignallableIface * - * A new session started, + * Notify the underlying webrtc object that a session has started. */ Signal::builder("session-started") .param_types([str::static_type(), str::static_type()]) @@ -100,6 +123,9 @@ unsafe impl prelude::ObjectInterface for Signallable { * @session_id: The ID of the producer that was added * @peer_id: The ID of the consumer peer who wants to initiate a * session + * + * Notify the underlying webrtc object that a session has been requested from the + * peer. */ Signal::builder("session-requested") .param_types([ @@ -134,6 +160,10 @@ unsafe impl prelude::ObjectInterface for Signallable { }); Some(Signallable::request_meta(arg0).to_value()) }) + .accumulator(move |_hint, output, input| { + *output = input.clone(); + false + }) .build(), /** * GstRSWebRTCSignallableIface::handle-ice: @@ -142,8 +172,11 @@ unsafe impl prelude::ObjectInterface for Signallable { * @sdp_m_line_index: The mlineindex of the ice candidate * @sdp_mid: Media ID of the ice candidate * @candiate: Information about the candidate + * + * Notify the underlying webrtc object of an ICE candidate. */ Signal::builder("handle-ice") + .flags(glib::SignalFlags::ACTION) .param_types([ str::static_type(), u32::static_type(), @@ -156,8 +189,11 @@ unsafe impl prelude::ObjectInterface for Signallable { * @self: The object implementing #GstRSWebRTCSignallableIface * @session_id: Id of the session being described * @description: The WebRTC session description + * + * Notify the underlying webrtc object of a received session description */ Signal::builder("session-description") + .flags(glib::SignalFlags::ACTION) .param_types([ str::static_type(), gst_webrtc::WebRTCSessionDescription::static_type(), @@ -170,16 +206,23 @@ unsafe impl prelude::ObjectInterface for Signallable { * Starts the signaller, connecting it to the signalling server. */ Signal::builder("start") - .flags(glib::SignalFlags::ACTION) + .run_last() + .return_type::() .class_handler(|_token, args| { let arg0 = args[0usize] .get::<&super::Signallable>() .unwrap_or_else(|e| { panic!("Wrong type for argument {}: {:?}", 0usize, e) }); - Signallable::start(arg0); + let vtable = arg0.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.start)(arg0); - None + Some(false.into()) + }) + .accumulator(move |_hint, output, input| { + *output = input.clone(); + false }) .build(), /** @@ -189,23 +232,32 @@ unsafe impl prelude::ObjectInterface for Signallable { * Stops the signaller, disconnecting it to the signalling server. */ Signal::builder("stop") - .flags(glib::SignalFlags::ACTION) + .run_last() + .return_type::() .class_handler(|_tokens, args| { let arg0 = args[0usize] .get::<&super::Signallable>() .unwrap_or_else(|e| { panic!("Wrong type for argument {}: {:?}", 0usize, e) }); - Signallable::stop(arg0); + let vtable = arg0.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.stop)(arg0); - None + Some(false.into()) + }) + .accumulator(move |_hint, output, input| { + *output = input.clone(); + false }) .build(), /** * GstRSWebRTCSignallableIface::shutdown: * @self: The object implementing #GstRSWebRTCSignallableIface */ - Signal::builder("shutdown").build(), + Signal::builder("shutdown") + .flags(glib::SignalFlags::ACTION) + .build(), /** * GstRSWebRTCSignallableIface::consumer-added: * @self: The object implementing #GstRSWebRTCSignallableIface @@ -226,9 +278,96 @@ unsafe impl prelude::ObjectInterface for Signallable { * This signal is emitted right after the connection with a consumer * has been dropped. */ - glib::subclass::Signal::builder("consumer-removed") + Signal::builder("consumer-removed") .param_types([String::static_type(), gst::Element::static_type()]) .build(), + /** + * GstRSWebRTCSignallableIface::send-session-description: + * @self: The object implementing #GstRSWebRTCSignallableIface + * @session_id: Id of the session being described + * @description: The WebRTC session description to send to the peer + * + * Send @description to the peer. + */ + Signal::builder("send-session-description") + .run_last() + .param_types([ + str::static_type(), + gst_webrtc::WebRTCSessionDescription::static_type(), + ]) + .return_type::() + .class_handler(|_tokens, args| { + let arg0 = args[0usize] + .get::<&super::Signallable>() + .unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 0usize, e) + }); + let arg1 = args[1usize].get::<&str>().unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 1usize, e) + }); + let arg2 = args[2usize] + .get::<&gst_webrtc::WebRTCSessionDescription>() + .unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 2usize, e) + }); + let vtable = arg0.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.send_sdp)(arg0, arg1, arg2); + + Some(false.into()) + }) + .accumulator(move |_hint, output, input| { + *output = input.clone(); + false + }) + .build(), + /** + * GstRSWebRTCSignallableIface::send-ice: + * @self: The object implementing #GstRSWebRTCSignallableIface + * @session_id: Id of the session being described + * @candidate: The ICE candidate description to send to the peer + * @sdp_m_line_index: The M-line of the session description this candidate applies to + * @sdp_mid: The MID this candidate applies to + * + * Send @candidate to the peer. + */ + Signal::builder("send-ice") + .param_types([ + str::static_type(), + str::static_type(), + u32::static_type(), + String::static_type(), + ]) + .return_type::() + .class_handler(|_tokens, args| { + let arg0 = args[0usize] + .get::<&super::Signallable>() + .unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 0usize, e) + }); + let arg1 = args[1usize].get::<&str>().unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 1usize, e) + }); + let arg2 = args[2usize].get::<&str>().unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 2usize, e) + }); + let arg3 = args[3usize].get::().unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 2usize, e) + }); + let arg4 = args[4usize].get::>().unwrap_or_else(|e| { + panic!("Wrong type for argument {}: {:?}", 2usize, e) + }); + let vtable = arg0.interface::().unwrap(); + let vtable = vtable.as_ref(); + (vtable.add_ice)(arg0, arg1, arg2, arg3, arg4); + + Some(false.into()) + }) + .accumulator(move |_hint, output, input| { + *output = input.clone(); + false + }) + .build(), ] }); SIGNALS.as_ref() @@ -279,7 +418,7 @@ where this: &super::Signallable, session_id: &str, candidate: &str, - sdp_m_line_index: Option, + sdp_m_line_index: u32, sdp_mid: Option, ) { let this = this @@ -310,7 +449,7 @@ pub trait SignallableImpl: object::ObjectImpl + 'static { &self, _session_id: &str, _candidate: &str, - _sdp_m_line_index: Option, + _sdp_m_line_index: u32, _sdp_mid: Option, ) { } @@ -325,7 +464,7 @@ pub trait SignallableExt: 'static { &self, session_id: &str, candidate: &str, - sdp_m_line_index: Option, + sdp_m_line_index: u32, sdp_mid: Option, ); fn end_session(&self, session_id: &str); @@ -333,43 +472,31 @@ pub trait SignallableExt: 'static { impl> SignallableExt for Obj { fn start(&self) { - let obj = self.upcast_ref::(); - let vtable = obj.interface::().unwrap(); - let vtable = vtable.as_ref(); - (vtable.start)(obj) + self.emit_by_name::("start", &[]); } fn stop(&self) { - let obj = self.upcast_ref::(); - let vtable = obj.interface::().unwrap(); - let vtable = vtable.as_ref(); - (vtable.stop)(obj) + self.emit_by_name::("stop", &[]); } fn send_sdp(&self, session_id: &str, sdp: &gst_webrtc::WebRTCSessionDescription) { - let obj = self.upcast_ref::(); - let vtable = obj.interface::().unwrap(); - let vtable = vtable.as_ref(); - (vtable.send_sdp)(obj, session_id, sdp) + self.emit_by_name::("send-session-description", &[&session_id, sdp]); } fn add_ice( &self, session_id: &str, candidate: &str, - sdp_m_line_index: Option, + sdp_m_line_index: u32, sdp_mid: Option, ) { - let obj = self.upcast_ref::(); - let vtable = obj.interface::().unwrap(); - let vtable = vtable.as_ref(); - (vtable.add_ice)(obj, session_id, candidate, sdp_m_line_index, sdp_mid) + self.emit_by_name::( + "send-ice", + &[&session_id, &candidate, &sdp_m_line_index, &sdp_mid], + ); } fn end_session(&self, session_id: &str) { - let obj = self.upcast_ref::(); - let vtable = obj.interface::().unwrap(); - let vtable = vtable.as_ref(); - (vtable.end_session)(obj, session_id) + self.emit_by_name::("session-ended", &[&session_id]); } } diff --git a/net/webrtc/src/signaller/imp.rs b/net/webrtc/src/signaller/imp.rs index 650fb335..f637fda4 100644 --- a/net/webrtc/src/signaller/imp.rs +++ b/net/webrtc/src/signaller/imp.rs @@ -321,7 +321,7 @@ impl Signaller { gst::info!(CAT, imp: self, "Session {session_id} ended"); self.obj() - .emit_by_name::<()>("session-ended", &[&session_id]); + .emit_by_name::("session-ended", &[&session_id]); } p::OutgoingMessage::Peer(p::PeerMessage { session_id, @@ -549,7 +549,7 @@ impl SignallableImpl for Signaller { &self, session_id: &str, candidate: &str, - sdp_m_line_index: Option, + sdp_m_line_index: u32, _sdp_mid: Option, ) { gst::debug!( @@ -562,7 +562,7 @@ impl SignallableImpl for Signaller { session_id: session_id.to_string(), peer_message: p::PeerMessageInner::Ice { candidate: candidate.to_string(), - sdp_m_line_index: sdp_m_line_index.unwrap(), + sdp_m_line_index, }, }); diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index ddd5ee4c..88ffcc34 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -1890,7 +1890,7 @@ impl WebRTCSink { let settings = self.settings.lock().unwrap(); settings .signaller - .add_ice(&session_id, &candidate, Some(sdp_m_line_index), None) + .add_ice(&session_id, &candidate, sdp_m_line_index, None) } /// Called by the signaller to add a new session diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index 7821dd38..2a1e56f0 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -865,12 +865,7 @@ impl WebRTCSrc { return; } }; - signaller.add_ice( - &session_id, - &candidate, - Some(sdp_m_line_index), - None::, - ); + signaller.add_ice(&session_id, &candidate, sdp_m_line_index, None::); } /// Called by the signaller with an ice candidate