webrtcsink: implement generic data channel control mechanism ..

.. and deprecate data channel navigation in favor of it.

A new property, "enable-data-channel-control" is exposed, when set to
TRUE a control data channel is offered, over which can be sent typed
upstream events.

This means further upstream events will be usable, for now only
navigation and custom upstream events are handled.

In addition, send response messages to notify the consumer of whether
its requests have been handled.

In the future this can also be extended to allow the consumer to send
queries, or seek events ..

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1711>
This commit is contained in:
Mathieu Duponchelle 2024-08-13 17:20:27 +02:00 committed by GStreamer Marge Bot
parent 0a4dc29efe
commit 01e28ddfe2
7 changed files with 468 additions and 99 deletions

View file

@ -160,7 +160,7 @@ gst-launch-1.0 webrtcsink signaller::uri="ws://127.0.0.1:8443" ..
`webrtcsink` implements the [`GstNavigation`] interface which allows interacting
with the content, for example move with your mouse, entering keys with the
keyboard, etc... On top of that a `WebRTCDataChannel` based protocol has been
implemented and can be activated with the `enable-data-channel-navigation=true`
implemented and can be activated with the `enable-control-data-channel=true`
property allowing a client to send GstNavigation events using the WebRTC data channel.
The [gstwebrtc-api](gstwebrtc-api) and `webrtcsrc` implement the protocol as well
@ -170,7 +170,7 @@ You can easily test this feature using the [`wpesrc`] element with the following
that will start a server that allows you to navigate the GStreamer documentation:
``` shell
gst-launch-1.0 wpesrc location=https://gstreamer.freedesktop.org/documentation/ ! queue ! webrtcsink enable-data-channel-navigation=true meta="meta,name=web-stream"
gst-launch-1.0 wpesrc location=https://gstreamer.freedesktop.org/documentation/ ! queue ! webrtcsink enable-control-data-channel=true meta="meta,name=web-stream"
```
You can control it inside the video running within your web browser (at
@ -178,7 +178,7 @@ https://127.0.0.1:9090 if you followed previous steps in that readme) or
with the following GSteamer pipeline as a client:
``` shell
gst-launch-1.0 webrtcsrc signaller::producer-peer-id=<webrtcsink-peer-id> enable-data-channel-navigation=true ! videoconvert ! autovideosink
gst-launch-1.0 webrtcsrc signaller::producer-peer-id=<webrtcsink-peer-id> enable-control-data-channel=true ! videoconvert ! autovideosink
```
### Sending HTTP headers

View file

@ -135,7 +135,7 @@ You just need to click on the corresponding entry to connect as a consumer to th
Launch the following GStreamer pipeline:
```shell
$ gst-launch-1.0 wpesrc location=https://gstreamer.freedesktop.org/documentation ! queue ! webrtcsink enable-data-channel-navigation=true meta="meta,name=web-stream"
$ gst-launch-1.0 wpesrc location=https://gstreamer.freedesktop.org/documentation ! queue ! webrtcsink enable-control-data-channel=true meta="meta,name=web-stream"
```
Once the GStreamer pipeline launched, you will see a new producer with the name *web-stream*. When connecting to this

View file

@ -197,7 +197,7 @@ export default class ConsumerSession extends WebRTCSession {
connection.ondatachannel = (event) => {
const rtcDataChannel = event.channel;
if (rtcDataChannel && (rtcDataChannel.label === "input")) {
if (rtcDataChannel && (rtcDataChannel.label === "control")) {
if (this._remoteController) {
const previousController = this._remoteController;
this._remoteController = null;

View file

@ -93,6 +93,7 @@ export default class RemoteController extends EventTarget {
this._videoElementComputedStyle = null;
this._videoElementKeyboard = null;
this._lastTouchEventTimestamp = 0;
this._requestCounter = 0;
rtcDataChannel.addEventListener("close", () => {
if (this._rtcDataChannel === rtcDataChannel) {
@ -109,8 +110,24 @@ export default class RemoteController extends EventTarget {
}));
}
});
rtcDataChannel.addEventListener("message", (event) => {
try {
const msg = JSON.parse(event.data);
if (msg.type === "ControlResponseMessage") {
this.dispatchEvent(new CustomEvent("controlResponse", { detail: msg }));
}
} catch (ex) {
this.dispatchEvent(new ErrorEvent("error", {
message: "cannot parse control message from signaling server",
error: ex
}));
}
});
}
/**
* The underlying WebRTC data channel connected to a remote GStreamer webrtcsink producer offering remote control.
* The value may be null if the remote controller has been closed.
@ -179,6 +196,42 @@ export default class RemoteController extends EventTarget {
}
}
/**
* Send a request over the control data channel.<br>
*
* @method GstWebRTCAPI.RemoteController#sendControlRequest
* @fires {@link GstWebRTCAPI#event:ErrorEvent}
* @param {object} request - The request to stringify and send over the channel
* @param {string} request.type - The type of the request
* @returns {number} The identifier attributed to the request, or -1 if an exception occurred
*/
sendControlRequest(request) {
try {
if (!request || (typeof (request) !== "object")) {
throw new Error("invalid request");
}
if (!this._rtcDataChannel) {
throw new Error("remote controller data channel is closed");
}
let message = {
id: this._requestCounter++,
request: request
};
this._rtcDataChannel.send(JSON.stringify(message));
return message.id;
} catch (ex) {
this.dispatchEvent(new ErrorEvent("error", {
message: `cannot send control message over session ${this._consumerSession.sessionId} remote controller`,
error: ex
}));
return -1;
}
}
/**
* Closes the remote controller channel.<br>
* It immediately shuts down the underlying WebRTC data channel connected to a remote GStreamer webrtcsink
@ -198,22 +251,11 @@ export default class RemoteController extends EventTarget {
}
_sendGstNavigationEvent(data) {
try {
if (!data || (typeof (data) !== "object")) {
throw new Error("invalid GstNavigation event");
}
if (!this._rtcDataChannel) {
throw new Error("remote controller data channel is closed");
}
this._rtcDataChannel.send(JSON.stringify(data));
} catch (ex) {
this.dispatchEvent(new ErrorEvent("error", {
message: `cannot send GstNavigation event over session ${this._consumerSession.sessionId} remote controller`,
error: ex
}));
}
let request = {
type: "navigationEvent",
event: data
};
this.sendControlRequest(request);
}
_computeVideoMousePosition(event) {

View file

@ -999,13 +999,45 @@ pub fn cleanup_codec_caps(mut caps: gst::Caps) -> gst::Caps {
caps
}
#[derive(Debug, serde::Deserialize, serde::Serialize)]
use serde::{Deserialize, Serialize};
#[derive(Debug, Deserialize, Serialize)]
pub struct NavigationEvent {
pub mid: Option<String>,
#[serde(flatten)]
pub event: gst_video::NavigationEvent,
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(tag = "type")]
#[serde(rename_all = "camelCase")]
pub enum ControlRequest {
NavigationEvent {
event: gst_video::NavigationEvent,
},
#[serde(rename_all = "camelCase")]
CustomUpstreamEvent {
structure_name: String,
structure: serde_json::Value,
},
}
#[derive(Serialize, Deserialize, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
pub struct ControlRequestMessage {
pub id: u64,
pub mid: Option<String>,
pub request: ControlRequest,
}
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "type")]
pub struct ControlResponseMessage {
pub id: u64,
pub error: Option<String>,
}
pub fn find_smallest_available_ext_id(ids: impl IntoIterator<Item = u32>) -> u32 {
let used_numbers: HashSet<_> = ids.into_iter().collect();
(1..).find(|&num| !used_numbers.contains(&num)).unwrap()

View file

@ -70,6 +70,7 @@ const DEFAULT_DO_FEC: bool = true;
const DEFAULT_DO_RETRANSMISSION: bool = true;
const DEFAULT_DO_CLOCK_SIGNALLING: bool = false;
const DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION: bool = false;
const DEFAULT_ENABLE_CONTROL_DATA_CHANNEL: bool = false;
const DEFAULT_ICE_TRANSPORT_POLICY: WebRTCICETransportPolicy = WebRTCICETransportPolicy::All;
const DEFAULT_START_BITRATE: u32 = 2048000;
#[cfg(feature = "web_server")]
@ -109,6 +110,7 @@ struct Settings {
do_retransmission: bool,
do_clock_signalling: bool,
enable_data_channel_navigation: bool,
enable_control_data_channel: bool,
meta: Option<gst::Structure>,
ice_transport_policy: WebRTCICETransportPolicy,
signaller: Signallable,
@ -501,6 +503,7 @@ struct State {
streams: HashMap<String, InputStream>,
discoveries: HashMap<String, Vec<DiscoveryInfo>>,
navigation_handler: Option<NavigationEventHandler>,
control_events_handler: Option<ControlRequestHandler>,
mids: HashMap<String, String>,
signaller_signals: Option<SignallerSignals>,
finalizing_sessions: Arc<(Mutex<HashSet<String>>, Condvar)>,
@ -548,6 +551,111 @@ fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str) {
}
}
fn deserialize_serde_value(val: &serde_json::Value) -> Result<gst::glib::SendValue, Error> {
match val {
serde_json::Value::Null => {
return Err(anyhow!("Untyped null values are not handled"));
}
serde_json::Value::Bool(v) => Ok(v.to_send_value()),
serde_json::Value::Number(v) => {
if let Some(v) = v.as_i64() {
Ok(v.to_send_value())
} else if let Some(v) = v.as_u64() {
Ok(v.to_send_value())
} else if let Some(v) = v.as_f64() {
Ok(v.to_send_value())
} else {
unreachable!()
}
}
serde_json::Value::String(v) => Ok(v.to_send_value()),
serde_json::Value::Array(a) => {
let mut gst_array = gst::Array::default();
for val in a {
gst_array.append_value(deserialize_serde_value(&val)?);
}
Ok(gst_array.to_send_value())
}
serde_json::Value::Object(_) => {
Ok(deserialize_serde_object(val, "webrtcsink-deserialized")?.to_send_value())
}
}
}
fn deserialize_serde_object(obj: &serde_json::Value, name: &str) -> Result<gst::Structure, Error> {
let serde_json::Value::Object(map) = obj else {
return Err(anyhow!("not a serde object"));
};
let mut ret = gst::Structure::builder(name);
for (key, value) in map {
ret = ret.field(key, deserialize_serde_value(value)?);
}
Ok(ret.build())
}
fn handle_control_event(
sink: &super::BaseWebRTCSink,
msg: &str,
) -> Result<utils::ControlResponseMessage, Error> {
let msg: utils::ControlRequestMessage = serde_json::from_str(msg)?;
let event = match msg.request {
utils::ControlRequest::NavigationEvent { event } => {
gst::event::Navigation::new(event.structure())
}
utils::ControlRequest::CustomUpstreamEvent {
structure_name,
structure,
} => {
gst::event::CustomUpstream::new(deserialize_serde_object(&structure, &structure_name)?)
}
};
gst::log!(CAT, obj = sink, "Processing control event: {:?}", event);
let mut ret = false;
if let Some(mid) = msg.mid {
let this = sink.imp();
let state = this.state.lock().unwrap();
if let Some(stream_name) = state.mids.get(&mid) {
if let Some(stream) = state.streams.get(stream_name) {
if !stream.sink_pad.push_event(event.clone()) {
gst::info!(CAT, obj = sink, "Could not send event: {:?}", event);
} else {
ret = true;
}
}
}
} else {
let this = sink.imp();
let state = this.state.lock().unwrap();
state.streams.iter().for_each(|(_, stream)| {
if !stream.sink_pad.push_event(event.clone()) {
gst::info!(CAT, obj = sink, "Could not send event: {:?}", event);
} else {
ret = true;
}
});
}
Ok(utils::ControlResponseMessage {
id: msg.id,
error: if ret {
None
} else {
Some("No sink pad could handle the request".to_string())
},
})
}
/// Simple utility for tearing down a pipeline cleanly
struct PipelineWrapper(gst::Pipeline);
@ -557,6 +665,11 @@ struct PipelineWrapper(gst::Pipeline);
#[derive(Debug)]
struct NavigationEventHandler((glib::SignalHandlerId, WebRTCDataChannel));
// Structure to generate arbitrary upstream events from a WebRTCDataChannel
#[allow(dead_code)]
#[derive(Debug)]
struct ControlRequestHandler((glib::SignalHandlerId, WebRTCDataChannel));
/// Our instance structure
#[derive(Default)]
pub struct BaseWebRTCSink {
@ -589,6 +702,7 @@ impl Default for Settings {
do_retransmission: DEFAULT_DO_RETRANSMISSION,
do_clock_signalling: DEFAULT_DO_CLOCK_SIGNALLING,
enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION,
enable_control_data_channel: DEFAULT_ENABLE_CONTROL_DATA_CHANNEL,
meta: None,
ice_transport_policy: DEFAULT_ICE_TRANSPORT_POLICY,
signaller: signaller.upcast(),
@ -622,6 +736,7 @@ impl Default for State {
streams: HashMap::new(),
discoveries: HashMap::new(),
navigation_handler: None,
control_events_handler: None,
mids: HashMap::new(),
signaller_signals: Default::default(),
finalizing_sessions: Arc::new((Mutex::new(HashSet::new()), Condvar::new())),
@ -1643,6 +1758,51 @@ impl NavigationEventHandler {
}
}
impl ControlRequestHandler {
fn new(element: &super::BaseWebRTCSink, webrtcbin: &gst::Element) -> Self {
let channel = webrtcbin.emit_by_name::<WebRTCDataChannel>(
"create-data-channel",
&[
&"control",
&gst::Structure::builder("config")
.field("priority", gst_webrtc::WebRTCPriorityType::High)
.build(),
],
);
Self((
channel.connect_closure(
"on-message-string",
false,
glib::closure!(
#[watch]
element,
move |channel: &WebRTCDataChannel, msg: &str| {
match handle_control_event(element, msg) {
Err(err) => {
gst::error!(CAT, "Failed to handle control event: {err:?}");
}
Ok(msg) => match serde_json::to_string(&msg).ok() {
Some(s) => {
channel.send_string(Some(s.as_str()));
}
None => {
gst::error!(
CAT,
obj = element,
"Failed to serialize control response",
);
}
},
}
}
),
),
channel,
))
}
}
/// How to configure RTP extensions for payloaders, if at all
enum ExtensionConfigurationType {
/// Skip configuration, do not add any extensions
@ -3176,6 +3336,7 @@ impl BaseWebRTCSink {
}
let enable_data_channel_navigation = settings_clone.enable_data_channel_navigation;
let enable_control_data_channel = settings_clone.enable_control_data_channel;
drop(settings_clone);
@ -3207,6 +3368,12 @@ impl BaseWebRTCSink {
Some(NavigationEventHandler::new(&element, &webrtcbin));
}
if enable_control_data_channel {
let mut state = this.state.lock().unwrap();
state.control_events_handler =
Some(ControlRequestHandler::new(&element, &webrtcbin));
}
// This is intentionally emitted with the pipeline in the Ready state,
// so that application code can create data channels at the correct
// moment.
@ -4167,12 +4334,32 @@ impl ObjectImpl for BaseWebRTCSink {
.default_value(DEFAULT_DO_CLOCK_SIGNALLING)
.mutable_ready()
.build(),
/**
* GstBaseWebRTCSink:enable-data-channel-navigation:
*
* Enable navigation events through a dedicated WebRTCDataChannel.
*
* Deprecated:plugins-rs-0.14.0: Use #GstBaseWebRTCSink:enable-control-data-channel
*/
glib::ParamSpecBoolean::builder("enable-data-channel-navigation")
.nick("Enable data channel navigation")
.blurb("Enable navigation events through a dedicated WebRTCDataChannel")
.default_value(DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION)
.mutable_ready()
.build(),
/**
* GstBaseWebRTCSink:enable-control-data-channel:
*
* Enable receiving arbitrary events through data channel.
*
* Since: plugins-rs-0.14.0
*/
glib::ParamSpecBoolean::builder("enable-control-data-channel")
.nick("Enable control data channel")
.blurb("Enable receiving arbitrary events through data channel")
.default_value(DEFAULT_ENABLE_CONTROL_DATA_CHANNEL)
.mutable_ready()
.build(),
glib::ParamSpecBoxed::builder::<gst::Structure>("meta")
.nick("Meta")
.blurb("Free form metadata about the producer")
@ -4342,6 +4529,11 @@ impl ObjectImpl for BaseWebRTCSink {
settings.enable_data_channel_navigation =
value.get::<bool>().expect("type checked upstream");
}
"enable-control-data-channel" => {
let mut settings = self.settings.lock().unwrap();
settings.enable_control_data_channel =
value.get::<bool>().expect("type checked upstream");
}
"meta" => {
let mut settings = self.settings.lock().unwrap();
settings.meta = value
@ -4456,6 +4648,10 @@ impl ObjectImpl for BaseWebRTCSink {
let settings = self.settings.lock().unwrap();
settings.enable_data_channel_navigation.to_value()
}
"enable-control-data-channel" => {
let settings = self.settings.lock().unwrap();
settings.enable_control_data_channel.to_value()
}
"stats" => self.gather_stats().to_value(),
"meta" => {
let settings = self.settings.lock().unwrap();

View file

@ -3,7 +3,7 @@
use gst::prelude::*;
use crate::signaller::{prelude::*, Signallable, Signaller};
use crate::utils::{Codec, Codecs, NavigationEvent, AUDIO_CAPS, RTP_CAPS, VIDEO_CAPS};
use crate::utils::{self, Codec, Codecs, NavigationEvent, AUDIO_CAPS, RTP_CAPS, VIDEO_CAPS};
use crate::webrtcsrc::WebRTCSrcPad;
use anyhow::{Context, Error};
use gst::glib;
@ -22,6 +22,7 @@ use url::Url;
const DEFAULT_STUN_SERVER: Option<&str> = Some("stun://stun.l.google.com:19302");
const DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION: bool = false;
const DEFAULT_ENABLE_CONTROL_DATA_CHANNEL: bool = false;
const DEFAULT_DO_RETRANSMISSION: bool = true;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@ -40,6 +41,7 @@ struct Settings {
video_codecs: Vec<Codec>,
audio_codecs: Vec<Codec>,
enable_data_channel_navigation: bool,
enable_control_data_channel: bool,
do_retransmission: bool,
}
@ -68,62 +70,83 @@ impl ObjectImpl for BaseWebRTCSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPS: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecString::builder("stun-server")
.nick("The STUN server to use")
.blurb("The STUN server of the form stun://host:port")
.flags(glib::ParamFlags::READWRITE)
.default_value(DEFAULT_STUN_SERVER)
.mutable_ready()
.build(),
gst::ParamSpecArray::builder("turn-servers")
.nick("List of TURN servers to use")
.blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">")
.element_spec(&glib::ParamSpecString::builder("turn-server")
.nick("TURN Server")
.blurb("The TURN server of the form turn(s)://username:password@host:port.")
.build()
)
.mutable_ready()
.build(),
glib::ParamSpecObject::builder::<Signallable>("signaller")
.flags(glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY)
.blurb("The Signallable object to use to handle WebRTC Signalling")
.build(),
glib::ParamSpecBoxed::builder::<gst::Structure>("meta")
.flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb("Free form metadata about the consumer")
.build(),
gst::ParamSpecArray::builder("video-codecs")
.flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb(&format!("Names of video codecs to be be used during the SDP negotiation. Valid values: [{}]",
Codecs::video_codecs()
.map(|c| c.name.as_str())
.join(", ")
))
.element_spec(&glib::ParamSpecString::builder("video-codec-name").build())
.build(),
gst::ParamSpecArray::builder("audio-codecs")
.flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb(&format!("Names of audio codecs to be be used during the SDP negotiation. Valid values: [{}]",
Codecs::audio_codecs()
.map(|c| c.name.as_str())
.join(", ")
))
.element_spec(&glib::ParamSpecString::builder("audio-codec-name").build())
.build(),
glib::ParamSpecBoolean::builder("enable-data-channel-navigation")
.nick("Enable data channel navigation")
.blurb("Enable navigation events through a dedicated WebRTCDataChannel")
.default_value(DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION)
.mutable_ready()
.build(),
glib::ParamSpecBoolean::builder("do-retransmission")
.nick("Enable retransmission")
.blurb("Send retransmission events upstream when a packet is late")
.default_value(DEFAULT_DO_RETRANSMISSION)
.mutable_ready()
.build(),
]
glib::ParamSpecString::builder("stun-server")
.nick("The STUN server to use")
.blurb("The STUN server of the form stun://host:port")
.flags(glib::ParamFlags::READWRITE)
.default_value(DEFAULT_STUN_SERVER)
.mutable_ready()
.build(),
gst::ParamSpecArray::builder("turn-servers")
.nick("List of TURN servers to use")
.blurb("The TURN servers of the form <\"turn(s)://username:password@host:port\", \"turn(s)://username1:password1@host1:port1\">")
.element_spec(&glib::ParamSpecString::builder("turn-server")
.nick("TURN Server")
.blurb("The TURN server of the form turn(s)://username:password@host:port.")
.build()
)
.mutable_ready()
.build(),
glib::ParamSpecObject::builder::<Signallable>("signaller")
.flags(glib::ParamFlags::READWRITE | glib::ParamFlags::CONSTRUCT_ONLY)
.blurb("The Signallable object to use to handle WebRTC Signalling")
.build(),
glib::ParamSpecBoxed::builder::<gst::Structure>("meta")
.flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb("Free form metadata about the consumer")
.build(),
gst::ParamSpecArray::builder("video-codecs")
.flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb(&format!("Names of video codecs to be be used during the SDP negotiation. Valid values: [{}]",
Codecs::video_codecs()
.map(|c| c.name.as_str())
.join(", ")
))
.element_spec(&glib::ParamSpecString::builder("video-codec-name").build())
.build(),
gst::ParamSpecArray::builder("audio-codecs")
.flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb(&format!("Names of audio codecs to be be used during the SDP negotiation. Valid values: [{}]",
Codecs::audio_codecs()
.map(|c| c.name.as_str())
.join(", ")
))
.element_spec(&glib::ParamSpecString::builder("audio-codec-name").build())
.build(),
/**
* GstBaseWebRTCSrc:enable-data-channel-navigation:
*
* Enable navigation events through a dedicated WebRTCDataChannel.
*
* Deprecated:plugins-rs-0.14.0: Use #GstBaseWebRTCSrc:enable-control-data-channel
*/
glib::ParamSpecBoolean::builder("enable-data-channel-navigation")
.nick("Enable data channel navigation")
.blurb("Enable navigation events through a dedicated WebRTCDataChannel")
.default_value(DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION)
.mutable_ready()
.build(),
/**
* GstBaseWebRTCSink:enable-control-data-channel:
*
* Enable sending control requests through data channel.
* This includes but is not limited to the forwarding of navigation events.
*
* Since: plugins-rs-0.14.0
*/
glib::ParamSpecBoolean::builder("enable-control-data-channel")
.nick("Enable control data channel")
.blurb("Enable sending control requests through a dedicated WebRTCDataChannel")
.default_value(DEFAULT_ENABLE_CONTROL_DATA_CHANNEL)
.mutable_ready()
.build(),
glib::ParamSpecBoolean::builder("do-retransmission")
.nick("Enable retransmission")
.blurb("Send retransmission events upstream when a packet is late")
.default_value(DEFAULT_DO_RETRANSMISSION)
.mutable_ready()
.build(),
]
});
PROPS.as_ref()
@ -181,6 +204,10 @@ impl ObjectImpl for BaseWebRTCSrc {
let mut settings = self.settings.lock().unwrap();
settings.enable_data_channel_navigation = value.get::<bool>().unwrap();
}
"enable-control-data-channel" => {
let mut settings = self.settings.lock().unwrap();
settings.enable_control_data_channel = value.get::<bool>().unwrap();
}
"do-retransmission" => {
let mut settings = self.settings.lock().unwrap();
settings.do_retransmission = value.get::<bool>().unwrap();
@ -217,6 +244,10 @@ impl ObjectImpl for BaseWebRTCSrc {
let settings = self.settings.lock().unwrap();
settings.enable_data_channel_navigation.to_value()
}
"enable-control-data-channel" => {
let settings = self.settings.lock().unwrap();
settings.enable_control_data_channel.to_value()
}
"do-retransmission" => self.settings.lock().unwrap().do_retransmission.to_value(),
name => panic!("{} getter not implemented", name),
}
@ -285,6 +316,7 @@ impl Default for Settings {
.cloned()
.collect(),
enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION,
enable_control_data_channel: DEFAULT_ENABLE_CONTROL_DATA_CHANNEL,
do_retransmission: DEFAULT_DO_RETRANSMISSION,
}
}
@ -318,6 +350,7 @@ impl Session {
n_video_pads: AtomicU16::new(0),
n_audio_pads: AtomicU16::new(0),
flow_combiner: Mutex::new(gst_base::UniqueFlowCombiner::new()),
request_counter: 0,
})
}
@ -391,6 +424,40 @@ impl Session {
}
}
fn send_control_request(
&mut self,
request: utils::ControlRequest,
element: &super::BaseWebRTCSrc,
) {
if let Some(data_channel) = &self.data_channel.borrow_mut() {
let msg = utils::ControlRequestMessage {
id: self.request_counter,
mid: None,
request,
};
self.request_counter += 1;
match serde_json::to_string(&msg).ok() {
Some(str) => {
gst::trace!(
CAT,
obj = element,
"Sending control request to peer for session {}",
self.id
);
data_channel.send_string(Some(str.as_str()));
}
None => {
gst::error!(
CAT,
obj = element,
"Could not serialize navigation event for session {}",
self.id
);
}
}
}
}
// Creates a bin which contains the webrtcbin, encoded filter (if requested) plus parser
// and decoder (if needed) for every session
//
@ -482,13 +549,15 @@ impl Session {
))
.build();
if element
.imp()
.settings
.lock()
.unwrap()
.enable_data_channel_navigation
{
let (enable_data_channel_navigation, enable_control_data_channel) = {
let settings = element.imp().settings.lock().unwrap();
(
settings.enable_data_channel_navigation,
settings.enable_control_data_channel,
)
};
if enable_data_channel_navigation || enable_control_data_channel {
webrtcbin_pad.add_probe(
gst::PadProbeType::EVENT_UPSTREAM,
glib::clone!(
@ -508,10 +577,18 @@ impl Session {
let mut state = element.imp().state.lock().unwrap();
if let Some(session) = state.sessions.get_mut(&sess_id) {
session.send_navigation_event(
gst_video::NavigationEvent::parse(ev).unwrap(),
&element,
);
if enable_data_channel_navigation {
session.send_navigation_event(
gst_video::NavigationEvent::parse(ev).unwrap(),
&element,
);
}
if enable_control_data_channel {
let request = utils::ControlRequest::NavigationEvent {
event: gst_video::NavigationEvent::parse(ev).unwrap(),
};
session.send_control_request(request, &element);
}
} else {
gst::error!(CAT, obj = element, "session {sess_id:?} does not exist");
}
@ -1432,8 +1509,15 @@ impl ElementImpl for BaseWebRTCSrc {
fn send_event(&self, event: gst::Event) -> bool {
match event.view() {
gst::EventView::Navigation(ev) => {
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
// Return without potentially warning
if !settings.enable_data_channel_navigation && !settings.enable_control_data_channel
{
return true;
}
if state.sessions.len() != 1 {
gst::warning!(
CAT,
@ -1444,15 +1528,29 @@ impl ElementImpl for BaseWebRTCSrc {
);
false
} else {
state
.sessions
.values_mut()
.next()
.unwrap()
.send_navigation_event(
gst_video::NavigationEvent::parse(ev).unwrap(),
&self.obj(),
);
if settings.enable_data_channel_navigation {
state
.sessions
.values_mut()
.next()
.unwrap()
.send_navigation_event(
gst_video::NavigationEvent::parse(ev).unwrap(),
&self.obj(),
);
}
if settings.enable_control_data_channel {
let request = utils::ControlRequest::NavigationEvent {
event: gst_video::NavigationEvent::parse(ev).unwrap(),
};
state
.sessions
.values_mut()
.next()
.unwrap()
.send_control_request(request, &self.obj());
}
true
}
}
@ -1502,6 +1600,7 @@ struct Session {
n_video_pads: AtomicU16,
n_audio_pads: AtomicU16,
flow_combiner: Mutex<gst_base::UniqueFlowCombiner>,
request_counter: u64,
}
struct State {
sessions: HashMap<String, Session>,