From 01e28ddfe205cfd9572211f2b9bcce4522d2f8ec Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Tue, 13 Aug 2024 17:20:27 +0200 Subject: [PATCH] webrtcsink: implement generic data channel control mechanism .. .. and deprecate data channel navigation in favor of it. A new property, "enable-data-channel-control" is exposed, when set to TRUE a control data channel is offered, over which can be sent typed upstream events. This means further upstream events will be usable, for now only navigation and custom upstream events are handled. In addition, send response messages to notify the consumer of whether its requests have been handled. In the future this can also be extended to allow the consumer to send queries, or seek events .. Part-of: --- net/webrtc/README.md | 6 +- net/webrtc/gstwebrtc-api/README.md | 2 +- .../gstwebrtc-api/src/consumer-session.js | 2 +- .../gstwebrtc-api/src/remote-controller.js | 74 +++-- net/webrtc/src/utils.rs | 34 ++- net/webrtc/src/webrtcsink/imp.rs | 196 ++++++++++++++ net/webrtc/src/webrtcsrc/imp.rs | 253 ++++++++++++------ 7 files changed, 468 insertions(+), 99 deletions(-) diff --git a/net/webrtc/README.md b/net/webrtc/README.md index edced0be..d2dab0ea 100644 --- a/net/webrtc/README.md +++ b/net/webrtc/README.md @@ -160,7 +160,7 @@ gst-launch-1.0 webrtcsink signaller::uri="ws://127.0.0.1:8443" .. `webrtcsink` implements the [`GstNavigation`] interface which allows interacting with the content, for example move with your mouse, entering keys with the keyboard, etc... On top of that a `WebRTCDataChannel` based protocol has been -implemented and can be activated with the `enable-data-channel-navigation=true` +implemented and can be activated with the `enable-control-data-channel=true` property allowing a client to send GstNavigation events using the WebRTC data channel. The [gstwebrtc-api](gstwebrtc-api) and `webrtcsrc` implement the protocol as well @@ -170,7 +170,7 @@ You can easily test this feature using the [`wpesrc`] element with the following that will start a server that allows you to navigate the GStreamer documentation: ``` shell -gst-launch-1.0 wpesrc location=https://gstreamer.freedesktop.org/documentation/ ! queue ! webrtcsink enable-data-channel-navigation=true meta="meta,name=web-stream" +gst-launch-1.0 wpesrc location=https://gstreamer.freedesktop.org/documentation/ ! queue ! webrtcsink enable-control-data-channel=true meta="meta,name=web-stream" ``` You can control it inside the video running within your web browser (at @@ -178,7 +178,7 @@ https://127.0.0.1:9090 if you followed previous steps in that readme) or with the following GSteamer pipeline as a client: ``` shell -gst-launch-1.0 webrtcsrc signaller::producer-peer-id= enable-data-channel-navigation=true ! videoconvert ! autovideosink +gst-launch-1.0 webrtcsrc signaller::producer-peer-id= enable-control-data-channel=true ! videoconvert ! autovideosink ``` ### Sending HTTP headers diff --git a/net/webrtc/gstwebrtc-api/README.md b/net/webrtc/gstwebrtc-api/README.md index fada157e..5cd8c915 100644 --- a/net/webrtc/gstwebrtc-api/README.md +++ b/net/webrtc/gstwebrtc-api/README.md @@ -135,7 +135,7 @@ You just need to click on the corresponding entry to connect as a consumer to th Launch the following GStreamer pipeline: ```shell -$ gst-launch-1.0 wpesrc location=https://gstreamer.freedesktop.org/documentation ! queue ! webrtcsink enable-data-channel-navigation=true meta="meta,name=web-stream" +$ gst-launch-1.0 wpesrc location=https://gstreamer.freedesktop.org/documentation ! queue ! webrtcsink enable-control-data-channel=true meta="meta,name=web-stream" ``` Once the GStreamer pipeline launched, you will see a new producer with the name *web-stream*. When connecting to this diff --git a/net/webrtc/gstwebrtc-api/src/consumer-session.js b/net/webrtc/gstwebrtc-api/src/consumer-session.js index b4ddd35d..d92c7a3b 100644 --- a/net/webrtc/gstwebrtc-api/src/consumer-session.js +++ b/net/webrtc/gstwebrtc-api/src/consumer-session.js @@ -197,7 +197,7 @@ export default class ConsumerSession extends WebRTCSession { connection.ondatachannel = (event) => { const rtcDataChannel = event.channel; - if (rtcDataChannel && (rtcDataChannel.label === "input")) { + if (rtcDataChannel && (rtcDataChannel.label === "control")) { if (this._remoteController) { const previousController = this._remoteController; this._remoteController = null; diff --git a/net/webrtc/gstwebrtc-api/src/remote-controller.js b/net/webrtc/gstwebrtc-api/src/remote-controller.js index 939e40e6..50c06420 100644 --- a/net/webrtc/gstwebrtc-api/src/remote-controller.js +++ b/net/webrtc/gstwebrtc-api/src/remote-controller.js @@ -93,6 +93,7 @@ export default class RemoteController extends EventTarget { this._videoElementComputedStyle = null; this._videoElementKeyboard = null; this._lastTouchEventTimestamp = 0; + this._requestCounter = 0; rtcDataChannel.addEventListener("close", () => { if (this._rtcDataChannel === rtcDataChannel) { @@ -109,8 +110,24 @@ export default class RemoteController extends EventTarget { })); } }); + + rtcDataChannel.addEventListener("message", (event) => { + try { + const msg = JSON.parse(event.data); + + if (msg.type === "ControlResponseMessage") { + this.dispatchEvent(new CustomEvent("controlResponse", { detail: msg })); + } + } catch (ex) { + this.dispatchEvent(new ErrorEvent("error", { + message: "cannot parse control message from signaling server", + error: ex + })); + } + }); } + /** * The underlying WebRTC data channel connected to a remote GStreamer webrtcsink producer offering remote control. * The value may be null if the remote controller has been closed. @@ -179,6 +196,42 @@ export default class RemoteController extends EventTarget { } } + /** + * Send a request over the control data channel.
+ * + * @method GstWebRTCAPI.RemoteController#sendControlRequest + * @fires {@link GstWebRTCAPI#event:ErrorEvent} + * @param {object} request - The request to stringify and send over the channel + * @param {string} request.type - The type of the request + * @returns {number} The identifier attributed to the request, or -1 if an exception occurred + */ + sendControlRequest(request) { + try { + if (!request || (typeof (request) !== "object")) { + throw new Error("invalid request"); + } + + if (!this._rtcDataChannel) { + throw new Error("remote controller data channel is closed"); + } + + let message = { + id: this._requestCounter++, + request: request + }; + + this._rtcDataChannel.send(JSON.stringify(message)); + + return message.id; + } catch (ex) { + this.dispatchEvent(new ErrorEvent("error", { + message: `cannot send control message over session ${this._consumerSession.sessionId} remote controller`, + error: ex + })); + return -1; + } + } + /** * Closes the remote controller channel.
* It immediately shuts down the underlying WebRTC data channel connected to a remote GStreamer webrtcsink @@ -198,22 +251,11 @@ export default class RemoteController extends EventTarget { } _sendGstNavigationEvent(data) { - try { - if (!data || (typeof (data) !== "object")) { - throw new Error("invalid GstNavigation event"); - } - - if (!this._rtcDataChannel) { - throw new Error("remote controller data channel is closed"); - } - - this._rtcDataChannel.send(JSON.stringify(data)); - } catch (ex) { - this.dispatchEvent(new ErrorEvent("error", { - message: `cannot send GstNavigation event over session ${this._consumerSession.sessionId} remote controller`, - error: ex - })); - } + let request = { + type: "navigationEvent", + event: data + }; + this.sendControlRequest(request); } _computeVideoMousePosition(event) { diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs index e95a0929..b0005682 100644 --- a/net/webrtc/src/utils.rs +++ b/net/webrtc/src/utils.rs @@ -999,13 +999,45 @@ pub fn cleanup_codec_caps(mut caps: gst::Caps) -> gst::Caps { caps } -#[derive(Debug, serde::Deserialize, serde::Serialize)] +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] pub struct NavigationEvent { pub mid: Option, #[serde(flatten)] pub event: gst_video::NavigationEvent, } +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(tag = "type")] +#[serde(rename_all = "camelCase")] +pub enum ControlRequest { + NavigationEvent { + event: gst_video::NavigationEvent, + }, + #[serde(rename_all = "camelCase")] + CustomUpstreamEvent { + structure_name: String, + structure: serde_json::Value, + }, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct ControlRequestMessage { + pub id: u64, + pub mid: Option, + pub request: ControlRequest, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "type")] +pub struct ControlResponseMessage { + pub id: u64, + pub error: Option, +} + pub fn find_smallest_available_ext_id(ids: impl IntoIterator) -> u32 { let used_numbers: HashSet<_> = ids.into_iter().collect(); (1..).find(|&num| !used_numbers.contains(&num)).unwrap() diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 3100007c..fda02267 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -70,6 +70,7 @@ const DEFAULT_DO_FEC: bool = true; const DEFAULT_DO_RETRANSMISSION: bool = true; const DEFAULT_DO_CLOCK_SIGNALLING: bool = false; const DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION: bool = false; +const DEFAULT_ENABLE_CONTROL_DATA_CHANNEL: bool = false; const DEFAULT_ICE_TRANSPORT_POLICY: WebRTCICETransportPolicy = WebRTCICETransportPolicy::All; const DEFAULT_START_BITRATE: u32 = 2048000; #[cfg(feature = "web_server")] @@ -109,6 +110,7 @@ struct Settings { do_retransmission: bool, do_clock_signalling: bool, enable_data_channel_navigation: bool, + enable_control_data_channel: bool, meta: Option, ice_transport_policy: WebRTCICETransportPolicy, signaller: Signallable, @@ -501,6 +503,7 @@ struct State { streams: HashMap, discoveries: HashMap>, navigation_handler: Option, + control_events_handler: Option, mids: HashMap, signaller_signals: Option, finalizing_sessions: Arc<(Mutex>, Condvar)>, @@ -548,6 +551,111 @@ fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) { } } +fn deserialize_serde_value(val: &serde_json::Value) -> Result { + match val { + serde_json::Value::Null => { + return Err(anyhow!("Untyped null values are not handled")); + } + serde_json::Value::Bool(v) => Ok(v.to_send_value()), + serde_json::Value::Number(v) => { + if let Some(v) = v.as_i64() { + Ok(v.to_send_value()) + } else if let Some(v) = v.as_u64() { + Ok(v.to_send_value()) + } else if let Some(v) = v.as_f64() { + Ok(v.to_send_value()) + } else { + unreachable!() + } + } + serde_json::Value::String(v) => Ok(v.to_send_value()), + serde_json::Value::Array(a) => { + let mut gst_array = gst::Array::default(); + + for val in a { + gst_array.append_value(deserialize_serde_value(&val)?); + } + + Ok(gst_array.to_send_value()) + } + serde_json::Value::Object(_) => { + Ok(deserialize_serde_object(val, "webrtcsink-deserialized")?.to_send_value()) + } + } +} + +fn deserialize_serde_object(obj: &serde_json::Value, name: &str) -> Result { + let serde_json::Value::Object(map) = obj else { + return Err(anyhow!("not a serde object")); + }; + + let mut ret = gst::Structure::builder(name); + + for (key, value) in map { + ret = ret.field(key, deserialize_serde_value(value)?); + } + + Ok(ret.build()) +} + +fn handle_control_event( + sink: &super::BaseWebRTCSink, + msg: &str, +) -> Result { + let msg: utils::ControlRequestMessage = serde_json::from_str(msg)?; + + let event = match msg.request { + utils::ControlRequest::NavigationEvent { event } => { + gst::event::Navigation::new(event.structure()) + } + utils::ControlRequest::CustomUpstreamEvent { + structure_name, + structure, + } => { + gst::event::CustomUpstream::new(deserialize_serde_object(&structure, &structure_name)?) + } + }; + + gst::log!(CAT, obj = sink, "Processing control event: {:?}", event); + + let mut ret = false; + + if let Some(mid) = msg.mid { + let this = sink.imp(); + + let state = this.state.lock().unwrap(); + if let Some(stream_name) = state.mids.get(&mid) { + if let Some(stream) = state.streams.get(stream_name) { + if !stream.sink_pad.push_event(event.clone()) { + gst::info!(CAT, obj = sink, "Could not send event: {:?}", event); + } else { + ret = true; + } + } + } + } else { + let this = sink.imp(); + + let state = this.state.lock().unwrap(); + state.streams.iter().for_each(|(_, stream)| { + if !stream.sink_pad.push_event(event.clone()) { + gst::info!(CAT, obj = sink, "Could not send event: {:?}", event); + } else { + ret = true; + } + }); + } + + Ok(utils::ControlResponseMessage { + id: msg.id, + error: if ret { + None + } else { + Some("No sink pad could handle the request".to_string()) + }, + }) +} + /// Simple utility for tearing down a pipeline cleanly struct PipelineWrapper(gst::Pipeline); @@ -557,6 +665,11 @@ struct PipelineWrapper(gst::Pipeline); #[derive(Debug)] struct NavigationEventHandler((glib::SignalHandlerId, WebRTCDataChannel)); +// Structure to generate arbitrary upstream events from a WebRTCDataChannel +#[allow(dead_code)] +#[derive(Debug)] +struct ControlRequestHandler((glib::SignalHandlerId, WebRTCDataChannel)); + /// Our instance structure #[derive(Default)] pub struct BaseWebRTCSink { @@ -589,6 +702,7 @@ impl Default for Settings { do_retransmission: DEFAULT_DO_RETRANSMISSION, do_clock_signalling: DEFAULT_DO_CLOCK_SIGNALLING, enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION, + enable_control_data_channel: DEFAULT_ENABLE_CONTROL_DATA_CHANNEL, meta: None, ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY, signaller: signaller.upcast(), @@ -622,6 +736,7 @@ impl Default for State { streams: HashMap::new(), discoveries: HashMap::new(), navigation_handler: None, + control_events_handler: None, mids: HashMap::new(), signaller_signals: Default::default(), finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())), @@ -1643,6 +1758,51 @@ impl NavigationEventHandler { } } +impl ControlRequestHandler { + fn new(element: &super::BaseWebRTCSink, webrtcbin: &gst::Element) -> Self { + let channel = webrtcbin.emit_by_name::( + "create-data-channel", + &[ + &"control", + &gst::Structure::builder("config") + .field("priority", gst_webrtc::WebRTCPriorityType::High) + .build(), + ], + ); + + Self(( + channel.connect_closure( + "on-message-string", + false, + glib::closure!( + #[watch] + element, + move |channel: &WebRTCDataChannel, msg: &str| { + match handle_control_event(element, msg) { + Err(err) => { + gst::error!(CAT, "Failed to handle control event: {err:?}"); + } + Ok(msg) => match serde_json::to_string(&msg).ok() { + Some(s) => { + channel.send_string(Some(s.as_str())); + } + None => { + gst::error!( + CAT, + obj = element, + "Failed to serialize control response", + ); + } + }, + } + } + ), + ), + channel, + )) + } +} + /// How to configure RTP extensions for payloaders, if at all enum ExtensionConfigurationType { /// Skip configuration, do not add any extensions @@ -3176,6 +3336,7 @@ impl BaseWebRTCSink { } let enable_data_channel_navigation = settings_clone.enable_data_channel_navigation; + let enable_control_data_channel = settings_clone.enable_control_data_channel; drop(settings_clone); @@ -3207,6 +3368,12 @@ impl BaseWebRTCSink { Some(NavigationEventHandler::new(&element, &webrtcbin)); } + if enable_control_data_channel { + let mut state = this.state.lock().unwrap(); + state.control_events_handler = + Some(ControlRequestHandler::new(&element, &webrtcbin)); + } + // This is intentionally emitted with the pipeline in the Ready state, // so that application code can create data channels at the correct // moment. @@ -4167,12 +4334,32 @@ impl ObjectImpl for BaseWebRTCSink { .default_value(DEFAULT_DO_CLOCK_SIGNALLING) .mutable_ready() .build(), + /** + * GstBaseWebRTCSink:enable-data-channel-navigation: + * + * Enable navigation events through a dedicated WebRTCDataChannel. + * + * Deprecated:plugins-rs-0.14.0: Use #GstBaseWebRTCSink:enable-control-data-channel + */ glib::ParamSpecBoolean::builder("enable-data-channel-navigation") .nick("Enable data channel navigation") .blurb("Enable navigation events through a dedicated WebRTCDataChannel") .default_value(DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION) .mutable_ready() .build(), + /** + * GstBaseWebRTCSink:enable-control-data-channel: + * + * Enable receiving arbitrary events through data channel. + * + * Since: plugins-rs-0.14.0 + */ + glib::ParamSpecBoolean::builder("enable-control-data-channel") + .nick("Enable control data channel") + .blurb("Enable receiving arbitrary events through data channel") + .default_value(DEFAULT_ENABLE_CONTROL_DATA_CHANNEL) + .mutable_ready() + .build(), glib::ParamSpecBoxed::builder::("meta") .nick("Meta") .blurb("Free form metadata about the producer") @@ -4342,6 +4529,11 @@ impl ObjectImpl for BaseWebRTCSink { settings.enable_data_channel_navigation = value.get::().expect("type checked upstream"); } + "enable-control-data-channel" => { + let mut settings = self.settings.lock().unwrap(); + settings.enable_control_data_channel = + value.get::().expect("type checked upstream"); + } "meta" => { let mut settings = self.settings.lock().unwrap(); settings.meta = value @@ -4456,6 +4648,10 @@ impl ObjectImpl for BaseWebRTCSink { let settings = self.settings.lock().unwrap(); settings.enable_data_channel_navigation.to_value() } + "enable-control-data-channel" => { + let settings = self.settings.lock().unwrap(); + settings.enable_control_data_channel.to_value() + } "stats" => self.gather_stats().to_value(), "meta" => { let settings = self.settings.lock().unwrap(); diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index a0a8d182..9c681d53 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -3,7 +3,7 @@ use gst::prelude::*; use crate::signaller::{prelude::*, Signallable, Signaller}; -use crate::utils::{Codec, Codecs, NavigationEvent, AUDIO_CAPS, RTP_CAPS, VIDEO_CAPS}; +use crate::utils::{self, Codec, Codecs, NavigationEvent, AUDIO_CAPS, RTP_CAPS, VIDEO_CAPS}; use crate::webrtcsrc::WebRTCSrcPad; use anyhow::{Context, Error}; use gst::glib; @@ -22,6 +22,7 @@ use url::Url; const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19302"); const DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION: bool = false; +const DEFAULT_ENABLE_CONTROL_DATA_CHANNEL: bool = false; const DEFAULT_DO_RETRANSMISSION: bool = true; static CAT: Lazy = Lazy::new(|| { @@ -40,6 +41,7 @@ struct Settings { video_codecs: Vec, audio_codecs: Vec, enable_data_channel_navigation: bool, + enable_control_data_channel: bool, do_retransmission: bool, } @@ -68,62 +70,83 @@ impl ObjectImpl for BaseWebRTCSrc { fn properties() -> &'static [glib::ParamSpec] { static PROPS: Lazy> = Lazy::new(|| { vec![ - glib::ParamSpecString::builder("stun-server") - .nick("The STUN server to use") - .blurb("The STUN server of the form stun://host:port") - .flags(glib::ParamFlags::READWRITE) - .default_value(DEFAULT_STUN_SERVER) - .mutable_ready() - .build(), - gst::ParamSpecArray::builder("turn-servers") - .nick("List of TURN servers to use") - .blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">") - .element_spec(&glib::ParamSpecString::builder("turn-server") - .nick("TURN Server") - .blurb("The TURN server of the form turn(s)://username:password@host:port.") - .build() - ) - .mutable_ready() - .build(), - glib::ParamSpecObject::builder::("signaller") - .flags(glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY) - .blurb("The Signallable object to use to handle WebRTC Signalling") - .build(), - glib::ParamSpecBoxed::builder::("meta") - .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) - .blurb("Free form metadata about the consumer") - .build(), - gst::ParamSpecArray::builder("video-codecs") - .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) - .blurb(&format!("Names of video codecs to be be used during the SDP negotiation. Valid values: [{}]", - Codecs::video_codecs() - .map(|c| c.name.as_str()) - .join(", ") - )) - .element_spec(&glib::ParamSpecString::builder("video-codec-name").build()) - .build(), - gst::ParamSpecArray::builder("audio-codecs") - .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) - .blurb(&format!("Names of audio codecs to be be used during the SDP negotiation. Valid values: [{}]", - Codecs::audio_codecs() - .map(|c| c.name.as_str()) - .join(", ") - )) - .element_spec(&glib::ParamSpecString::builder("audio-codec-name").build()) - .build(), - glib::ParamSpecBoolean::builder("enable-data-channel-navigation") - .nick("Enable data channel navigation") - .blurb("Enable navigation events through a dedicated WebRTCDataChannel") - .default_value(DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION) - .mutable_ready() - .build(), - glib::ParamSpecBoolean::builder("do-retransmission") - .nick("Enable retransmission") - .blurb("Send retransmission events upstream when a packet is late") - .default_value(DEFAULT_DO_RETRANSMISSION) - .mutable_ready() - .build(), - ] + glib::ParamSpecString::builder("stun-server") + .nick("The STUN server to use") + .blurb("The STUN server of the form stun://host:port") + .flags(glib::ParamFlags::READWRITE) + .default_value(DEFAULT_STUN_SERVER) + .mutable_ready() + .build(), + gst::ParamSpecArray::builder("turn-servers") + .nick("List of TURN servers to use") + .blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">") + .element_spec(&glib::ParamSpecString::builder("turn-server") + .nick("TURN Server") + .blurb("The TURN server of the form turn(s)://username:password@host:port.") + .build() + ) + .mutable_ready() + .build(), + glib::ParamSpecObject::builder::("signaller") + .flags(glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY) + .blurb("The Signallable object to use to handle WebRTC Signalling") + .build(), + glib::ParamSpecBoxed::builder::("meta") + .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) + .blurb("Free form metadata about the consumer") + .build(), + gst::ParamSpecArray::builder("video-codecs") + .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) + .blurb(&format!("Names of video codecs to be be used during the SDP negotiation. Valid values: [{}]", + Codecs::video_codecs() + .map(|c| c.name.as_str()) + .join(", ") + )) + .element_spec(&glib::ParamSpecString::builder("video-codec-name").build()) + .build(), + gst::ParamSpecArray::builder("audio-codecs") + .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) + .blurb(&format!("Names of audio codecs to be be used during the SDP negotiation. Valid values: [{}]", + Codecs::audio_codecs() + .map(|c| c.name.as_str()) + .join(", ") + )) + .element_spec(&glib::ParamSpecString::builder("audio-codec-name").build()) + .build(), + /** + * GstBaseWebRTCSrc:enable-data-channel-navigation: + * + * Enable navigation events through a dedicated WebRTCDataChannel. + * + * Deprecated:plugins-rs-0.14.0: Use #GstBaseWebRTCSrc:enable-control-data-channel + */ + glib::ParamSpecBoolean::builder("enable-data-channel-navigation") + .nick("Enable data channel navigation") + .blurb("Enable navigation events through a dedicated WebRTCDataChannel") + .default_value(DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION) + .mutable_ready() + .build(), + /** + * GstBaseWebRTCSink:enable-control-data-channel: + * + * Enable sending control requests through data channel. + * This includes but is not limited to the forwarding of navigation events. + * + * Since: plugins-rs-0.14.0 + */ + glib::ParamSpecBoolean::builder("enable-control-data-channel") + .nick("Enable control data channel") + .blurb("Enable sending control requests through a dedicated WebRTCDataChannel") + .default_value(DEFAULT_ENABLE_CONTROL_DATA_CHANNEL) + .mutable_ready() + .build(), + glib::ParamSpecBoolean::builder("do-retransmission") + .nick("Enable retransmission") + .blurb("Send retransmission events upstream when a packet is late") + .default_value(DEFAULT_DO_RETRANSMISSION) + .mutable_ready() + .build(), + ] }); PROPS.as_ref() @@ -181,6 +204,10 @@ impl ObjectImpl for BaseWebRTCSrc { let mut settings = self.settings.lock().unwrap(); settings.enable_data_channel_navigation = value.get::().unwrap(); } + "enable-control-data-channel" => { + let mut settings = self.settings.lock().unwrap(); + settings.enable_control_data_channel = value.get::().unwrap(); + } "do-retransmission" => { let mut settings = self.settings.lock().unwrap(); settings.do_retransmission = value.get::().unwrap(); @@ -217,6 +244,10 @@ impl ObjectImpl for BaseWebRTCSrc { let settings = self.settings.lock().unwrap(); settings.enable_data_channel_navigation.to_value() } + "enable-control-data-channel" => { + let settings = self.settings.lock().unwrap(); + settings.enable_control_data_channel.to_value() + } "do-retransmission" => self.settings.lock().unwrap().do_retransmission.to_value(), name => panic!("{} getter not implemented", name), } @@ -285,6 +316,7 @@ impl Default for Settings { .cloned() .collect(), enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION, + enable_control_data_channel: DEFAULT_ENABLE_CONTROL_DATA_CHANNEL, do_retransmission: DEFAULT_DO_RETRANSMISSION, } } @@ -318,6 +350,7 @@ impl Session { n_video_pads: AtomicU16::new(0), n_audio_pads: AtomicU16::new(0), flow_combiner: Mutex::new(gst_base::UniqueFlowCombiner::new()), + request_counter: 0, }) } @@ -391,6 +424,40 @@ impl Session { } } + fn send_control_request( + &mut self, + request: utils::ControlRequest, + element: &super::BaseWebRTCSrc, + ) { + if let Some(data_channel) = &self.data_channel.borrow_mut() { + let msg = utils::ControlRequestMessage { + id: self.request_counter, + mid: None, + request, + }; + self.request_counter += 1; + match serde_json::to_string(&msg).ok() { + Some(str) => { + gst::trace!( + CAT, + obj = element, + "Sending control request to peer for session {}", + self.id + ); + data_channel.send_string(Some(str.as_str())); + } + None => { + gst::error!( + CAT, + obj = element, + "Could not serialize navigation event for session {}", + self.id + ); + } + } + } + } + // Creates a bin which contains the webrtcbin, encoded filter (if requested) plus parser // and decoder (if needed) for every session // @@ -482,13 +549,15 @@ impl Session { )) .build(); - if element - .imp() - .settings - .lock() - .unwrap() - .enable_data_channel_navigation - { + let (enable_data_channel_navigation, enable_control_data_channel) = { + let settings = element.imp().settings.lock().unwrap(); + ( + settings.enable_data_channel_navigation, + settings.enable_control_data_channel, + ) + }; + + if enable_data_channel_navigation || enable_control_data_channel { webrtcbin_pad.add_probe( gst::PadProbeType::EVENT_UPSTREAM, glib::clone!( @@ -508,10 +577,18 @@ impl Session { let mut state = element.imp().state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(&sess_id) { - session.send_navigation_event( - gst_video::NavigationEvent::parse(ev).unwrap(), - &element, - ); + if enable_data_channel_navigation { + session.send_navigation_event( + gst_video::NavigationEvent::parse(ev).unwrap(), + &element, + ); + } + if enable_control_data_channel { + let request = utils::ControlRequest::NavigationEvent { + event: gst_video::NavigationEvent::parse(ev).unwrap(), + }; + session.send_control_request(request, &element); + } } else { gst::error!(CAT, obj = element, "session {sess_id:?} does not exist"); } @@ -1432,8 +1509,15 @@ impl ElementImpl for BaseWebRTCSrc { fn send_event(&self, event: gst::Event) -> bool { match event.view() { gst::EventView::Navigation(ev) => { + let settings = self.settings.lock().unwrap(); let mut state = self.state.lock().unwrap(); + // Return without potentially warning + if !settings.enable_data_channel_navigation && !settings.enable_control_data_channel + { + return true; + } + if state.sessions.len() != 1 { gst::warning!( CAT, @@ -1444,15 +1528,29 @@ impl ElementImpl for BaseWebRTCSrc { ); false } else { - state - .sessions - .values_mut() - .next() - .unwrap() - .send_navigation_event( - gst_video::NavigationEvent::parse(ev).unwrap(), - &self.obj(), - ); + if settings.enable_data_channel_navigation { + state + .sessions + .values_mut() + .next() + .unwrap() + .send_navigation_event( + gst_video::NavigationEvent::parse(ev).unwrap(), + &self.obj(), + ); + } + + if settings.enable_control_data_channel { + let request = utils::ControlRequest::NavigationEvent { + event: gst_video::NavigationEvent::parse(ev).unwrap(), + }; + state + .sessions + .values_mut() + .next() + .unwrap() + .send_control_request(request, &self.obj()); + } true } } @@ -1502,6 +1600,7 @@ struct Session { n_video_pads: AtomicU16, n_audio_pads: AtomicU16, flow_combiner: Mutex, + request_counter: u64, } struct State { sessions: HashMap,