diff --git a/net/webrtc/README.md b/net/webrtc/README.md index edced0be..d2dab0ea 100644 --- a/net/webrtc/README.md +++ b/net/webrtc/README.md @@ -160,7 +160,7 @@ gst-launch-1.0 webrtcsink signaller::uri="ws://127.0.0.1:8443" .. `webrtcsink` implements the [`GstNavigation`] interface which allows interacting with the content, for example move with your mouse, entering keys with the keyboard, etc... On top of that a `WebRTCDataChannel` based protocol has been -implemented and can be activated with the `enable-data-channel-navigation=true` +implemented and can be activated with the `enable-control-data-channel=true` property allowing a client to send GstNavigation events using the WebRTC data channel. The [gstwebrtc-api](gstwebrtc-api) and `webrtcsrc` implement the protocol as well @@ -170,7 +170,7 @@ You can easily test this feature using the [`wpesrc`] element with the following that will start a server that allows you to navigate the GStreamer documentation: ``` shell -gst-launch-1.0 wpesrc location=https://gstreamer.freedesktop.org/documentation/ ! queue ! webrtcsink enable-data-channel-navigation=true meta="meta,name=web-stream" +gst-launch-1.0 wpesrc location=https://gstreamer.freedesktop.org/documentation/ ! queue ! webrtcsink enable-control-data-channel=true meta="meta,name=web-stream" ``` You can control it inside the video running within your web browser (at @@ -178,7 +178,7 @@ https://127.0.0.1:9090 if you followed previous steps in that readme) or with the following GSteamer pipeline as a client: ``` shell -gst-launch-1.0 webrtcsrc signaller::producer-peer-id= enable-data-channel-navigation=true ! videoconvert ! autovideosink +gst-launch-1.0 webrtcsrc signaller::producer-peer-id= enable-control-data-channel=true ! videoconvert ! autovideosink ``` ### Sending HTTP headers diff --git a/net/webrtc/gstwebrtc-api/README.md b/net/webrtc/gstwebrtc-api/README.md index fada157e..5cd8c915 100644 --- a/net/webrtc/gstwebrtc-api/README.md +++ b/net/webrtc/gstwebrtc-api/README.md @@ -135,7 +135,7 @@ You just need to click on the corresponding entry to connect as a consumer to th Launch the following GStreamer pipeline: ```shell -$ gst-launch-1.0 wpesrc location=https://gstreamer.freedesktop.org/documentation ! queue ! webrtcsink enable-data-channel-navigation=true meta="meta,name=web-stream" +$ gst-launch-1.0 wpesrc location=https://gstreamer.freedesktop.org/documentation ! queue ! webrtcsink enable-control-data-channel=true meta="meta,name=web-stream" ``` Once the GStreamer pipeline launched, you will see a new producer with the name *web-stream*. When connecting to this diff --git a/net/webrtc/gstwebrtc-api/src/consumer-session.js b/net/webrtc/gstwebrtc-api/src/consumer-session.js index b4ddd35d..d92c7a3b 100644 --- a/net/webrtc/gstwebrtc-api/src/consumer-session.js +++ b/net/webrtc/gstwebrtc-api/src/consumer-session.js @@ -197,7 +197,7 @@ export default class ConsumerSession extends WebRTCSession { connection.ondatachannel = (event) => { const rtcDataChannel = event.channel; - if (rtcDataChannel && (rtcDataChannel.label === "input")) { + if (rtcDataChannel && (rtcDataChannel.label === "control")) { if (this._remoteController) { const previousController = this._remoteController; this._remoteController = null; diff --git a/net/webrtc/gstwebrtc-api/src/remote-controller.js b/net/webrtc/gstwebrtc-api/src/remote-controller.js index 939e40e6..50c06420 100644 --- a/net/webrtc/gstwebrtc-api/src/remote-controller.js +++ b/net/webrtc/gstwebrtc-api/src/remote-controller.js @@ -93,6 +93,7 @@ export default class RemoteController extends EventTarget { this._videoElementComputedStyle = null; this._videoElementKeyboard = null; this._lastTouchEventTimestamp = 0; + this._requestCounter = 0; rtcDataChannel.addEventListener("close", () => { if (this._rtcDataChannel === rtcDataChannel) { @@ -109,8 +110,24 @@ export default class RemoteController extends EventTarget { })); } }); + + rtcDataChannel.addEventListener("message", (event) => { + try { + const msg = JSON.parse(event.data); + + if (msg.type === "ControlResponseMessage") { + this.dispatchEvent(new CustomEvent("controlResponse", { detail: msg })); + } + } catch (ex) { + this.dispatchEvent(new ErrorEvent("error", { + message: "cannot parse control message from signaling server", + error: ex + })); + } + }); } + /** * The underlying WebRTC data channel connected to a remote GStreamer webrtcsink producer offering remote control. * The value may be null if the remote controller has been closed. @@ -179,6 +196,42 @@ export default class RemoteController extends EventTarget { } } + /** + * Send a request over the control data channel.
+ * + * @method GstWebRTCAPI.RemoteController#sendControlRequest + * @fires {@link GstWebRTCAPI#event:ErrorEvent} + * @param {object} request - The request to stringify and send over the channel + * @param {string} request.type - The type of the request + * @returns {number} The identifier attributed to the request, or -1 if an exception occurred + */ + sendControlRequest(request) { + try { + if (!request || (typeof (request) !== "object")) { + throw new Error("invalid request"); + } + + if (!this._rtcDataChannel) { + throw new Error("remote controller data channel is closed"); + } + + let message = { + id: this._requestCounter++, + request: request + }; + + this._rtcDataChannel.send(JSON.stringify(message)); + + return message.id; + } catch (ex) { + this.dispatchEvent(new ErrorEvent("error", { + message: `cannot send control message over session ${this._consumerSession.sessionId} remote controller`, + error: ex + })); + return -1; + } + } + /** * Closes the remote controller channel.
* It immediately shuts down the underlying WebRTC data channel connected to a remote GStreamer webrtcsink @@ -198,22 +251,11 @@ export default class RemoteController extends EventTarget { } _sendGstNavigationEvent(data) { - try { - if (!data || (typeof (data) !== "object")) { - throw new Error("invalid GstNavigation event"); - } - - if (!this._rtcDataChannel) { - throw new Error("remote controller data channel is closed"); - } - - this._rtcDataChannel.send(JSON.stringify(data)); - } catch (ex) { - this.dispatchEvent(new ErrorEvent("error", { - message: `cannot send GstNavigation event over session ${this._consumerSession.sessionId} remote controller`, - error: ex - })); - } + let request = { + type: "navigationEvent", + event: data + }; + this.sendControlRequest(request); } _computeVideoMousePosition(event) { diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs index e95a0929..b0005682 100644 --- a/net/webrtc/src/utils.rs +++ b/net/webrtc/src/utils.rs @@ -999,13 +999,45 @@ pub fn cleanup_codec_caps(mut caps: gst::Caps) -> gst::Caps { caps } -#[derive(Debug, serde::Deserialize, serde::Serialize)] +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Deserialize, Serialize)] pub struct NavigationEvent { pub mid: Option, #[serde(flatten)] pub event: gst_video::NavigationEvent, } +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(tag = "type")] +#[serde(rename_all = "camelCase")] +pub enum ControlRequest { + NavigationEvent { + event: gst_video::NavigationEvent, + }, + #[serde(rename_all = "camelCase")] + CustomUpstreamEvent { + structure_name: String, + structure: serde_json::Value, + }, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq)] +#[serde(rename_all = "camelCase")] +pub struct ControlRequestMessage { + pub id: u64, + pub mid: Option, + pub request: ControlRequest, +} + +#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "type")] +pub struct ControlResponseMessage { + pub id: u64, + pub error: Option, +} + pub fn find_smallest_available_ext_id(ids: impl IntoIterator) -> u32 { let used_numbers: HashSet<_> = ids.into_iter().collect(); (1..).find(|&num| !used_numbers.contains(&num)).unwrap() diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 3100007c..fda02267 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -70,6 +70,7 @@ const DEFAULT_DO_FEC: bool = true; const DEFAULT_DO_RETRANSMISSION: bool = true; const DEFAULT_DO_CLOCK_SIGNALLING: bool = false; const DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION: bool = false; +const DEFAULT_ENABLE_CONTROL_DATA_CHANNEL: bool = false; const DEFAULT_ICE_TRANSPORT_POLICY: WebRTCICETransportPolicy = WebRTCICETransportPolicy::All; const DEFAULT_START_BITRATE: u32 = 2048000; #[cfg(feature = "web_server")] @@ -109,6 +110,7 @@ struct Settings { do_retransmission: bool, do_clock_signalling: bool, enable_data_channel_navigation: bool, + enable_control_data_channel: bool, meta: Option, ice_transport_policy: WebRTCICETransportPolicy, signaller: Signallable, @@ -501,6 +503,7 @@ struct State { streams: HashMap, discoveries: HashMap>, navigation_handler: Option, + control_events_handler: Option, mids: HashMap, signaller_signals: Option, finalizing_sessions: Arc<(Mutex>, Condvar)>, @@ -548,6 +551,111 @@ fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) { } } +fn deserialize_serde_value(val: &serde_json::Value) -> Result { + match val { + serde_json::Value::Null => { + return Err(anyhow!("Untyped null values are not handled")); + } + serde_json::Value::Bool(v) => Ok(v.to_send_value()), + serde_json::Value::Number(v) => { + if let Some(v) = v.as_i64() { + Ok(v.to_send_value()) + } else if let Some(v) = v.as_u64() { + Ok(v.to_send_value()) + } else if let Some(v) = v.as_f64() { + Ok(v.to_send_value()) + } else { + unreachable!() + } + } + serde_json::Value::String(v) => Ok(v.to_send_value()), + serde_json::Value::Array(a) => { + let mut gst_array = gst::Array::default(); + + for val in a { + gst_array.append_value(deserialize_serde_value(&val)?); + } + + Ok(gst_array.to_send_value()) + } + serde_json::Value::Object(_) => { + Ok(deserialize_serde_object(val, "webrtcsink-deserialized")?.to_send_value()) + } + } +} + +fn deserialize_serde_object(obj: &serde_json::Value, name: &str) -> Result { + let serde_json::Value::Object(map) = obj else { + return Err(anyhow!("not a serde object")); + }; + + let mut ret = gst::Structure::builder(name); + + for (key, value) in map { + ret = ret.field(key, deserialize_serde_value(value)?); + } + + Ok(ret.build()) +} + +fn handle_control_event( + sink: &super::BaseWebRTCSink, + msg: &str, +) -> Result { + let msg: utils::ControlRequestMessage = serde_json::from_str(msg)?; + + let event = match msg.request { + utils::ControlRequest::NavigationEvent { event } => { + gst::event::Navigation::new(event.structure()) + } + utils::ControlRequest::CustomUpstreamEvent { + structure_name, + structure, + } => { + gst::event::CustomUpstream::new(deserialize_serde_object(&structure, &structure_name)?) + } + }; + + gst::log!(CAT, obj = sink, "Processing control event: {:?}", event); + + let mut ret = false; + + if let Some(mid) = msg.mid { + let this = sink.imp(); + + let state = this.state.lock().unwrap(); + if let Some(stream_name) = state.mids.get(&mid) { + if let Some(stream) = state.streams.get(stream_name) { + if !stream.sink_pad.push_event(event.clone()) { + gst::info!(CAT, obj = sink, "Could not send event: {:?}", event); + } else { + ret = true; + } + } + } + } else { + let this = sink.imp(); + + let state = this.state.lock().unwrap(); + state.streams.iter().for_each(|(_, stream)| { + if !stream.sink_pad.push_event(event.clone()) { + gst::info!(CAT, obj = sink, "Could not send event: {:?}", event); + } else { + ret = true; + } + }); + } + + Ok(utils::ControlResponseMessage { + id: msg.id, + error: if ret { + None + } else { + Some("No sink pad could handle the request".to_string()) + }, + }) +} + /// Simple utility for tearing down a pipeline cleanly struct PipelineWrapper(gst::Pipeline); @@ -557,6 +665,11 @@ struct PipelineWrapper(gst::Pipeline); #[derive(Debug)] struct NavigationEventHandler((glib::SignalHandlerId, WebRTCDataChannel)); +// Structure to generate arbitrary upstream events from a WebRTCDataChannel +#[allow(dead_code)] +#[derive(Debug)] +struct ControlRequestHandler((glib::SignalHandlerId, WebRTCDataChannel)); + /// Our instance structure #[derive(Default)] pub struct BaseWebRTCSink { @@ -589,6 +702,7 @@ impl Default for Settings { do_retransmission: DEFAULT_DO_RETRANSMISSION, do_clock_signalling: DEFAULT_DO_CLOCK_SIGNALLING, enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION, + enable_control_data_channel: DEFAULT_ENABLE_CONTROL_DATA_CHANNEL, meta: None, ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY, signaller: signaller.upcast(), @@ -622,6 +736,7 @@ impl Default for State { streams: HashMap::new(), discoveries: HashMap::new(), navigation_handler: None, + control_events_handler: None, mids: HashMap::new(), signaller_signals: Default::default(), finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())), @@ -1643,6 +1758,51 @@ impl NavigationEventHandler { } } +impl ControlRequestHandler { + fn new(element: &super::BaseWebRTCSink, webrtcbin: &gst::Element) -> Self { + let channel = webrtcbin.emit_by_name::( + "create-data-channel", + &[ + &"control", + &gst::Structure::builder("config") + .field("priority", gst_webrtc::WebRTCPriorityType::High) + .build(), + ], + ); + + Self(( + channel.connect_closure( + "on-message-string", + false, + glib::closure!( + #[watch] + element, + move |channel: &WebRTCDataChannel, msg: &str| { + match handle_control_event(element, msg) { + Err(err) => { + gst::error!(CAT, "Failed to handle control event: {err:?}"); + } + Ok(msg) => match serde_json::to_string(&msg).ok() { + Some(s) => { + channel.send_string(Some(s.as_str())); + } + None => { + gst::error!( + CAT, + obj = element, + "Failed to serialize control response", + ); + } + }, + } + } + ), + ), + channel, + )) + } +} + /// How to configure RTP extensions for payloaders, if at all enum ExtensionConfigurationType { /// Skip configuration, do not add any extensions @@ -3176,6 +3336,7 @@ impl BaseWebRTCSink { } let enable_data_channel_navigation = settings_clone.enable_data_channel_navigation; + let enable_control_data_channel = settings_clone.enable_control_data_channel; drop(settings_clone); @@ -3207,6 +3368,12 @@ impl BaseWebRTCSink { Some(NavigationEventHandler::new(&element, &webrtcbin)); } + if enable_control_data_channel { + let mut state = this.state.lock().unwrap(); + state.control_events_handler = + Some(ControlRequestHandler::new(&element, &webrtcbin)); + } + // This is intentionally emitted with the pipeline in the Ready state, // so that application code can create data channels at the correct // moment. @@ -4167,12 +4334,32 @@ impl ObjectImpl for BaseWebRTCSink { .default_value(DEFAULT_DO_CLOCK_SIGNALLING) .mutable_ready() .build(), + /** + * GstBaseWebRTCSink:enable-data-channel-navigation: + * + * Enable navigation events through a dedicated WebRTCDataChannel. + * + * Deprecated:plugins-rs-0.14.0: Use #GstBaseWebRTCSink:enable-control-data-channel + */ glib::ParamSpecBoolean::builder("enable-data-channel-navigation") .nick("Enable data channel navigation") .blurb("Enable navigation events through a dedicated WebRTCDataChannel") .default_value(DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION) .mutable_ready() .build(), + /** + * GstBaseWebRTCSink:enable-control-data-channel: + * + * Enable receiving arbitrary events through data channel. + * + * Since: plugins-rs-0.14.0 + */ + glib::ParamSpecBoolean::builder("enable-control-data-channel") + .nick("Enable control data channel") + .blurb("Enable receiving arbitrary events through data channel") + .default_value(DEFAULT_ENABLE_CONTROL_DATA_CHANNEL) + .mutable_ready() + .build(), glib::ParamSpecBoxed::builder::("meta") .nick("Meta") .blurb("Free form metadata about the producer") @@ -4342,6 +4529,11 @@ impl ObjectImpl for BaseWebRTCSink { settings.enable_data_channel_navigation = value.get::().expect("type checked upstream"); } + "enable-control-data-channel" => { + let mut settings = self.settings.lock().unwrap(); + settings.enable_control_data_channel = + value.get::().expect("type checked upstream"); + } "meta" => { let mut settings = self.settings.lock().unwrap(); settings.meta = value @@ -4456,6 +4648,10 @@ impl ObjectImpl for BaseWebRTCSink { let settings = self.settings.lock().unwrap(); settings.enable_data_channel_navigation.to_value() } + "enable-control-data-channel" => { + let settings = self.settings.lock().unwrap(); + settings.enable_control_data_channel.to_value() + } "stats" => self.gather_stats().to_value(), "meta" => { let settings = self.settings.lock().unwrap(); diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index a0a8d182..9c681d53 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -3,7 +3,7 @@ use gst::prelude::*; use crate::signaller::{prelude::*, Signallable, Signaller}; -use crate::utils::{Codec, Codecs, NavigationEvent, AUDIO_CAPS, RTP_CAPS, VIDEO_CAPS}; +use crate::utils::{self, Codec, Codecs, NavigationEvent, AUDIO_CAPS, RTP_CAPS, VIDEO_CAPS}; use crate::webrtcsrc::WebRTCSrcPad; use anyhow::{Context, Error}; use gst::glib; @@ -22,6 +22,7 @@ use url::Url; const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19302"); const DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION: bool = false; +const DEFAULT_ENABLE_CONTROL_DATA_CHANNEL: bool = false; const DEFAULT_DO_RETRANSMISSION: bool = true; static CAT: Lazy = Lazy::new(|| { @@ -40,6 +41,7 @@ struct Settings { video_codecs: Vec, audio_codecs: Vec, enable_data_channel_navigation: bool, + enable_control_data_channel: bool, do_retransmission: bool, } @@ -68,62 +70,83 @@ impl ObjectImpl for BaseWebRTCSrc { fn properties() -> &'static [glib::ParamSpec] { static PROPS: Lazy> = Lazy::new(|| { vec![ - glib::ParamSpecString::builder("stun-server") - .nick("The STUN server to use") - .blurb("The STUN server of the form stun://host:port") - .flags(glib::ParamFlags::READWRITE) - .default_value(DEFAULT_STUN_SERVER) - .mutable_ready() - .build(), - gst::ParamSpecArray::builder("turn-servers") - .nick("List of TURN servers to use") - .blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">") - .element_spec(&glib::ParamSpecString::builder("turn-server") - .nick("TURN Server") - .blurb("The TURN server of the form turn(s)://username:password@host:port.") - .build() - ) - .mutable_ready() - .build(), - glib::ParamSpecObject::builder::("signaller") - .flags(glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY) - .blurb("The Signallable object to use to handle WebRTC Signalling") - .build(), - glib::ParamSpecBoxed::builder::("meta") - .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) - .blurb("Free form metadata about the consumer") - .build(), - gst::ParamSpecArray::builder("video-codecs") - .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) - .blurb(&format!("Names of video codecs to be be used during the SDP negotiation. Valid values: [{}]", - Codecs::video_codecs() - .map(|c| c.name.as_str()) - .join(", ") - )) - .element_spec(&glib::ParamSpecString::builder("video-codec-name").build()) - .build(), - gst::ParamSpecArray::builder("audio-codecs") - .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) - .blurb(&format!("Names of audio codecs to be be used during the SDP negotiation. Valid values: [{}]", - Codecs::audio_codecs() - .map(|c| c.name.as_str()) - .join(", ") - )) - .element_spec(&glib::ParamSpecString::builder("audio-codec-name").build()) - .build(), - glib::ParamSpecBoolean::builder("enable-data-channel-navigation") - .nick("Enable data channel navigation") - .blurb("Enable navigation events through a dedicated WebRTCDataChannel") - .default_value(DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION) - .mutable_ready() - .build(), - glib::ParamSpecBoolean::builder("do-retransmission") - .nick("Enable retransmission") - .blurb("Send retransmission events upstream when a packet is late") - .default_value(DEFAULT_DO_RETRANSMISSION) - .mutable_ready() - .build(), - ] + glib::ParamSpecString::builder("stun-server") + .nick("The STUN server to use") + .blurb("The STUN server of the form stun://host:port") + .flags(glib::ParamFlags::READWRITE) + .default_value(DEFAULT_STUN_SERVER) + .mutable_ready() + .build(), + gst::ParamSpecArray::builder("turn-servers") + .nick("List of TURN servers to use") + .blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">") + .element_spec(&glib::ParamSpecString::builder("turn-server") + .nick("TURN Server") + .blurb("The TURN server of the form turn(s)://username:password@host:port.") + .build() + ) + .mutable_ready() + .build(), + glib::ParamSpecObject::builder::("signaller") + .flags(glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY) + .blurb("The Signallable object to use to handle WebRTC Signalling") + .build(), + glib::ParamSpecBoxed::builder::("meta") + .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) + .blurb("Free form metadata about the consumer") + .build(), + gst::ParamSpecArray::builder("video-codecs") + .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) + .blurb(&format!("Names of video codecs to be be used during the SDP negotiation. Valid values: [{}]", + Codecs::video_codecs() + .map(|c| c.name.as_str()) + .join(", ") + )) + .element_spec(&glib::ParamSpecString::builder("video-codec-name").build()) + .build(), + gst::ParamSpecArray::builder("audio-codecs") + .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) + .blurb(&format!("Names of audio codecs to be be used during the SDP negotiation. Valid values: [{}]", + Codecs::audio_codecs() + .map(|c| c.name.as_str()) + .join(", ") + )) + .element_spec(&glib::ParamSpecString::builder("audio-codec-name").build()) + .build(), + /** + * GstBaseWebRTCSrc:enable-data-channel-navigation: + * + * Enable navigation events through a dedicated WebRTCDataChannel. + * + * Deprecated:plugins-rs-0.14.0: Use #GstBaseWebRTCSrc:enable-control-data-channel + */ + glib::ParamSpecBoolean::builder("enable-data-channel-navigation") + .nick("Enable data channel navigation") + .blurb("Enable navigation events through a dedicated WebRTCDataChannel") + .default_value(DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION) + .mutable_ready() + .build(), + /** + * GstBaseWebRTCSink:enable-control-data-channel: + * + * Enable sending control requests through data channel. + * This includes but is not limited to the forwarding of navigation events. + * + * Since: plugins-rs-0.14.0 + */ + glib::ParamSpecBoolean::builder("enable-control-data-channel") + .nick("Enable control data channel") + .blurb("Enable sending control requests through a dedicated WebRTCDataChannel") + .default_value(DEFAULT_ENABLE_CONTROL_DATA_CHANNEL) + .mutable_ready() + .build(), + glib::ParamSpecBoolean::builder("do-retransmission") + .nick("Enable retransmission") + .blurb("Send retransmission events upstream when a packet is late") + .default_value(DEFAULT_DO_RETRANSMISSION) + .mutable_ready() + .build(), + ] }); PROPS.as_ref() @@ -181,6 +204,10 @@ impl ObjectImpl for BaseWebRTCSrc { let mut settings = self.settings.lock().unwrap(); settings.enable_data_channel_navigation = value.get::().unwrap(); } + "enable-control-data-channel" => { + let mut settings = self.settings.lock().unwrap(); + settings.enable_control_data_channel = value.get::().unwrap(); + } "do-retransmission" => { let mut settings = self.settings.lock().unwrap(); settings.do_retransmission = value.get::().unwrap(); @@ -217,6 +244,10 @@ impl ObjectImpl for BaseWebRTCSrc { let settings = self.settings.lock().unwrap(); settings.enable_data_channel_navigation.to_value() } + "enable-control-data-channel" => { + let settings = self.settings.lock().unwrap(); + settings.enable_control_data_channel.to_value() + } "do-retransmission" => self.settings.lock().unwrap().do_retransmission.to_value(), name => panic!("{} getter not implemented", name), } @@ -285,6 +316,7 @@ impl Default for Settings { .cloned() .collect(), enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION, + enable_control_data_channel: DEFAULT_ENABLE_CONTROL_DATA_CHANNEL, do_retransmission: DEFAULT_DO_RETRANSMISSION, } } @@ -318,6 +350,7 @@ impl Session { n_video_pads: AtomicU16::new(0), n_audio_pads: AtomicU16::new(0), flow_combiner: Mutex::new(gst_base::UniqueFlowCombiner::new()), + request_counter: 0, }) } @@ -391,6 +424,40 @@ impl Session { } } + fn send_control_request( + &mut self, + request: utils::ControlRequest, + element: &super::BaseWebRTCSrc, + ) { + if let Some(data_channel) = &self.data_channel.borrow_mut() { + let msg = utils::ControlRequestMessage { + id: self.request_counter, + mid: None, + request, + }; + self.request_counter += 1; + match serde_json::to_string(&msg).ok() { + Some(str) => { + gst::trace!( + CAT, + obj = element, + "Sending control request to peer for session {}", + self.id + ); + data_channel.send_string(Some(str.as_str())); + } + None => { + gst::error!( + CAT, + obj = element, + "Could not serialize navigation event for session {}", + self.id + ); + } + } + } + } + // Creates a bin which contains the webrtcbin, encoded filter (if requested) plus parser // and decoder (if needed) for every session // @@ -482,13 +549,15 @@ impl Session { )) .build(); - if element - .imp() - .settings - .lock() - .unwrap() - .enable_data_channel_navigation - { + let (enable_data_channel_navigation, enable_control_data_channel) = { + let settings = element.imp().settings.lock().unwrap(); + ( + settings.enable_data_channel_navigation, + settings.enable_control_data_channel, + ) + }; + + if enable_data_channel_navigation || enable_control_data_channel { webrtcbin_pad.add_probe( gst::PadProbeType::EVENT_UPSTREAM, glib::clone!( @@ -508,10 +577,18 @@ impl Session { let mut state = element.imp().state.lock().unwrap(); if let Some(session) = state.sessions.get_mut(&sess_id) { - session.send_navigation_event( - gst_video::NavigationEvent::parse(ev).unwrap(), - &element, - ); + if enable_data_channel_navigation { + session.send_navigation_event( + gst_video::NavigationEvent::parse(ev).unwrap(), + &element, + ); + } + if enable_control_data_channel { + let request = utils::ControlRequest::NavigationEvent { + event: gst_video::NavigationEvent::parse(ev).unwrap(), + }; + session.send_control_request(request, &element); + } } else { gst::error!(CAT, obj = element, "session {sess_id:?} does not exist"); } @@ -1432,8 +1509,15 @@ impl ElementImpl for BaseWebRTCSrc { fn send_event(&self, event: gst::Event) -> bool { match event.view() { gst::EventView::Navigation(ev) => { + let settings = self.settings.lock().unwrap(); let mut state = self.state.lock().unwrap(); + // Return without potentially warning + if !settings.enable_data_channel_navigation && !settings.enable_control_data_channel + { + return true; + } + if state.sessions.len() != 1 { gst::warning!( CAT, @@ -1444,15 +1528,29 @@ impl ElementImpl for BaseWebRTCSrc { ); false } else { - state - .sessions - .values_mut() - .next() - .unwrap() - .send_navigation_event( - gst_video::NavigationEvent::parse(ev).unwrap(), - &self.obj(), - ); + if settings.enable_data_channel_navigation { + state + .sessions + .values_mut() + .next() + .unwrap() + .send_navigation_event( + gst_video::NavigationEvent::parse(ev).unwrap(), + &self.obj(), + ); + } + + if settings.enable_control_data_channel { + let request = utils::ControlRequest::NavigationEvent { + event: gst_video::NavigationEvent::parse(ev).unwrap(), + }; + state + .sessions + .values_mut() + .next() + .unwrap() + .send_control_request(request, &self.obj()); + } true } } @@ -1502,6 +1600,7 @@ struct Session { n_video_pads: AtomicU16, n_audio_pads: AtomicU16, flow_combiner: Mutex, + request_counter: u64, } struct State { sessions: HashMap,