webrtcsink: implement mechanism to forward metas over control channel

It may be desirable for the frontend to receive ancillary information
over the control channel.

Such information includes but is not limited to time code metas, support
for other metas (eg custom meta) might be implemented in the future, as
well as downstream events.

This patch implements a new info message, probes buffers that arrive at
nicesink to look up timecode metas and potentially forwards them to the
consumer when the `forward-metas` property is set appropriately.

Internally, a "dye" meta is used to trace the media identifier the
packet we are about to send over relates to, as rtpfunnel bundles all
packets together.

The example frontend code also gets a minor update and now logs info
messages to the console.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1749>
This commit is contained in:
Mathieu Duponchelle 2024-08-23 17:56:16 +02:00 committed by GStreamer Marge Bot
parent db026ad535
commit 6a23ae168f
6 changed files with 378 additions and 1 deletions

View file

@ -10496,6 +10496,18 @@
"type": "gboolean", "type": "gboolean",
"writable": true "writable": true
}, },
"forward-metas": {
"blurb": "Comma-separated list of buffer meta names to forward over the control data channel. Currently supported names are: timecode",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "",
"mutable": "playing",
"readable": true,
"type": "gchararray",
"writable": true
},
"ice-transport-policy": { "ice-transport-policy": {
"blurb": "The policy to apply for ICE transport", "blurb": "The policy to apply for ICE transport",
"conditionally-available": false, "conditionally-available": false,

View file

@ -257,6 +257,48 @@ Code in [gstwebrtc-api](gstwebrtc-api) is also licensed under the
[Mozilla Public License Version 2.0]: http://opensource.org/licenses/MPL-2.0 [Mozilla Public License Version 2.0]: http://opensource.org/licenses/MPL-2.0
## Control data channel
When the control data channel is enabled using the `enable-control-data-channel`
property, `webrtcsink` can optionally forward serialized metas and events to the
consumer.
Currently only timecode metas are supported, but other metas and events can easily
be added in a backward-compatible manner.
Here is an example for forwarding timecodes:
```
gst-launch-1.0 videotestsrc ! timecodestamper ! webrtcsink forward-metas="timecode" name=ws enable-control-data-channel=true run-signalling-server=true run-web-server=true
```
Point your web browser to `http://127.0.0.1:8080`, open the console and click
on the producer ID, you should see such messages in the console once the video
is displayed:
```
{
"type": "InfoMessage",
"mid": "video0",
"info": {
"meta": {
"type": "timeCode",
"hours": 0,
"minutes": 0,
"seconds": 5,
"frames": 17,
"field_count": 0,
"fps": [
30,
1
],
"flags": "none",
"latest_daily_jam": "2024-08-23T17:51:42+0200"
}
}
}
```
## Using the AWS KVS signaller ## Using the AWS KVS signaller
* Setup AWS Kinesis Video Streams * Setup AWS Kinesis Video Streams

View file

@ -418,6 +418,9 @@
entryElement.classList.add("has-remote-control"); entryElement.classList.add("has-remote-control");
submitRequestButtonElement.disabled = false; submitRequestButtonElement.disabled = false;
remoteController.attachVideoElement(videoElement); remoteController.attachVideoElement(videoElement);
remoteController.addEventListener("info", (e) => {
console.log("Received info message from producer: ", e.detail);
});
} else { } else {
entryElement.classList.remove("has-remote-control"); entryElement.classList.remove("has-remote-control");
submitRequestButtonElement.disabled = true; submitRequestButtonElement.disabled = true;

View file

@ -64,6 +64,23 @@ function getModifiers(event) {
return modifiers.join("+"); return modifiers.join("+");
} }
/**
* Event name: "info".<br>
* Triggered when a remote peer sends an information message over the control data channel.
* @event GstWebRTCAPI#InfoEvent
* @type {external:CustomEvent}
* @property {object} detail - The info message
* @see GstWebRTCAPI.RemoteController
*/
/**
* Event name: "controlResponse".<br>
* Triggered when a remote peer sends a response after a control request.
* @event GstWebRTCAPI#ControlResponseEvent
* @type {external:CustomEvent}
* @property {object} detail - The response message
* @see GstWebRTCAPI.RemoteController
*/
/** /**
* @class GstWebRTCAPI.RemoteController * @class GstWebRTCAPI.RemoteController
* @hideconstructor * @hideconstructor
@ -78,6 +95,8 @@ function getModifiers(event) {
* @extends {external:EventTarget} * @extends {external:EventTarget}
* @fires {@link GstWebRTCAPI#event:ErrorEvent} * @fires {@link GstWebRTCAPI#event:ErrorEvent}
* @fires {@link GstWebRTCAPI#event:ClosedEvent} * @fires {@link GstWebRTCAPI#event:ClosedEvent}
* @fires {@link GstWebRTCAPI#event:InfoEvent}
* @fires {@link GstWebRTCAPI#event:ControlResponseEvent}
* @see GstWebRTCAPI.ConsumerSession#remoteController * @see GstWebRTCAPI.ConsumerSession#remoteController
* @see GstWebRTCAPI.RemoteController#attachVideoElement * @see GstWebRTCAPI.RemoteController#attachVideoElement
* @see https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/tree/main/net/webrtc/gstwebrtc-api#produce-a-gstreamer-interactive-webrtc-stream-with-remote-control * @see https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/tree/main/net/webrtc/gstwebrtc-api#produce-a-gstreamer-interactive-webrtc-stream-with-remote-control
@ -117,6 +136,8 @@ export default class RemoteController extends EventTarget {
if (msg.type === "ControlResponseMessage") { if (msg.type === "ControlResponseMessage") {
this.dispatchEvent(new CustomEvent("controlResponse", { detail: msg })); this.dispatchEvent(new CustomEvent("controlResponse", { detail: msg }));
} else if (msg.type === "InfoMessage") {
this.dispatchEvent(new CustomEvent("info", { detail: msg }));
} }
} catch (ex) { } catch (ex) {
this.dispatchEvent(new ErrorEvent("error", { this.dispatchEvent(new ErrorEvent("error", {

View file

@ -1213,6 +1213,92 @@ pub fn deserialize_serde_object(
Ok(ret.build()) Ok(ret.build())
} }
#[derive(Debug, PartialEq, Eq)]
pub struct VideoTimeCodeFlags(gst_video::VideoTimeCodeFlags);
impl serde::Serialize for VideoTimeCodeFlags {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
if self.0.bits() == 0 {
serializer.serialize_str("none")
} else {
let mut ret = String::new();
if self.0.contains(gst_video::VideoTimeCodeFlags::DROP_FRAME) {
ret.push_str("drop-frame");
if self.0.contains(gst_video::VideoTimeCodeFlags::INTERLACED) {
ret.push_str("+interlaced");
}
} else if self.0.contains(gst_video::VideoTimeCodeFlags::INTERLACED) {
ret.push_str("interlaced");
}
serializer.serialize_str(&ret)
}
}
}
#[derive(Serialize, Debug, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "type")]
pub enum Meta {
TimeCode {
hours: u32,
minutes: u32,
seconds: u32,
frames: u32,
field_count: u32,
fps: gst::Fraction,
flags: VideoTimeCodeFlags,
latest_daily_jam: Option<String>,
},
}
#[derive(Serialize, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
pub enum Info {
Meta(Meta),
}
#[derive(Serialize, Debug, PartialEq)]
#[serde(rename_all = "camelCase")]
#[serde(tag = "type")]
pub struct InfoMessage {
pub mid: String,
pub info: Info,
}
pub fn serialize_meta(buffer: &gst::BufferRef, forward_metas: &HashSet<String>) -> Vec<Meta> {
let mut ret = vec![];
buffer.foreach_meta(|meta| {
if forward_metas.contains("timecode") {
if let Some(tc_meta) = meta.downcast_ref::<gst_video::VideoTimeCodeMeta>() {
let tc = tc_meta.tc();
ret.push(Meta::TimeCode {
hours: tc.hours(),
minutes: tc.minutes(),
seconds: tc.seconds(),
frames: tc.frames(),
field_count: tc.field_count(),
fps: tc.fps(),
flags: VideoTimeCodeFlags(tc.flags()),
latest_daily_jam: tc
.latest_daily_jam()
.and_then(|dt| {
let gst_dt: gst::DateTime = dt.into();
gst_dt.to_iso8601_string().ok()
})
.map(|s| s.to_string()),
});
}
}
std::ops::ControlFlow::Continue(())
});
ret
}
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
@ -1278,6 +1364,43 @@ mod tests {
Ok(()) Ok(())
} }
#[test]
fn test_serialize_meta() -> Result<(), String> {
gst::init().unwrap();
let mut buffer = gst::Buffer::new();
let time_code = gst_video::VideoTimeCode::new(
gst::Fraction::new(30, 1),
None,
gst_video::VideoTimeCodeFlags::empty(),
10,
53,
17,
0,
0,
);
gst_video::VideoTimeCodeMeta::add(
buffer.get_mut().unwrap(),
&time_code.try_into().unwrap(),
);
assert_eq!(
serialize_meta(&buffer, &[String::from("timecode")].into()),
vec![Meta::TimeCode {
hours: 10,
minutes: 53,
seconds: 17,
frames: 0,
field_count: 0,
fps: gst::Fraction::new(30, 1),
flags: VideoTimeCodeFlags(gst_video::VideoTimeCodeFlags::empty()),
latest_daily_jam: None,
}]
);
Ok(())
}
fn test_find_smallest_available_ext_id_case( fn test_find_smallest_available_ext_id_case(
ids: impl IntoIterator<Item = u32>, ids: impl IntoIterator<Item = u32>,
expected: u32, expected: u32,

View file

@ -85,6 +85,7 @@ const DEFAULT_WEB_SERVER_PATH: Option<&str> = None;
const DEFAULT_WEB_SERVER_DIRECTORY: &str = "gstwebrtc-api/dist"; const DEFAULT_WEB_SERVER_DIRECTORY: &str = "gstwebrtc-api/dist";
#[cfg(feature = "web_server")] #[cfg(feature = "web_server")]
const DEFAULT_WEB_SERVER_HOST_ADDR: &str = "http://127.0.0.1:8080"; const DEFAULT_WEB_SERVER_HOST_ADDR: &str = "http://127.0.0.1:8080";
const DEFAULT_FORWARD_METAS: &str = "";
/* Start adding some FEC when the bitrate > 2Mbps as we found experimentally /* Start adding some FEC when the bitrate > 2Mbps as we found experimentally
* that it is not worth it below that threshold */ * that it is not worth it below that threshold */
#[cfg(feature = "v1_22")] #[cfg(feature = "v1_22")]
@ -126,6 +127,7 @@ struct Settings {
web_server_directory: String, web_server_directory: String,
#[cfg(feature = "web_server")] #[cfg(feature = "web_server")]
web_server_host_addr: url::Url, web_server_host_addr: url::Url,
forward_metas: HashSet<String>,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -512,6 +514,7 @@ struct State {
#[cfg(feature = "web_server")] #[cfg(feature = "web_server")]
web_join_handle: Option<tokio::task::JoinHandle<()>>, web_join_handle: Option<tokio::task::JoinHandle<()>>,
session_mids: HashMap<String, HashMap<String, String>>, session_mids: HashMap<String, HashMap<String, String>>,
session_stream_names: HashMap<String, HashMap<String, String>>,
} }
fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str, session_id: &str) { fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str, session_id: &str) {
@ -687,6 +690,7 @@ impl Default for Settings {
web_server_directory: String::from(DEFAULT_WEB_SERVER_DIRECTORY), web_server_directory: String::from(DEFAULT_WEB_SERVER_DIRECTORY),
#[cfg(feature = "web_server")] #[cfg(feature = "web_server")]
web_server_host_addr: url::Url::parse(DEFAULT_WEB_SERVER_HOST_ADDR).unwrap(), web_server_host_addr: url::Url::parse(DEFAULT_WEB_SERVER_HOST_ADDR).unwrap(),
forward_metas: HashSet::new(),
} }
} }
} }
@ -713,6 +717,7 @@ impl Default for State {
#[cfg(feature = "web_server")] #[cfg(feature = "web_server")]
web_join_handle: None, web_join_handle: None,
session_mids: HashMap::new(), session_mids: HashMap::new(),
session_stream_names: HashMap::new(),
} }
} }
} }
@ -2859,6 +2864,45 @@ impl BaseWebRTCSink {
signaller.add_ice(&session_id, &candidate, sdp_m_line_index, None) signaller.add_ice(&session_id, &candidate, sdp_m_line_index, None)
} }
fn send_buffer_metas(
&self,
state: &State,
settings: &Settings,
session_id: &str,
buffer: &gst::BufferRef,
) {
// This should probably never happen as the transform
// function for the dye meta always copies, but we can't
// rely on the behavior of all encoders / payloaders.
let Ok(dye_meta) = gst::meta::CustomMeta::from_buffer(buffer, "webrtcsink-dye") else {
return;
};
let stream_name = dye_meta.structure().get::<String>("stream-name").unwrap();
let Some(mid) = state
.session_stream_names
.get(session_id)
.and_then(|names| names.get(&stream_name))
else {
return;
};
if let Some(ref handler) = state.control_events_handler {
for meta in utils::serialize_meta(buffer, &settings.forward_metas) {
match serde_json::to_string(&utils::InfoMessage {
mid: mid.to_owned(),
info: utils::Info::Meta(meta),
}) {
Ok(msg) => {
handler.0 .1.send_string(Some(msg.as_str()));
}
Err(err) => {
gst::warning!(CAT, imp = self, "Failed to serialize info message: {err:?}",);
}
}
}
}
}
/// Called by the signaller to add a new session /// Called by the signaller to add a new session
fn start_session( fn start_session(
&self, &self,
@ -2996,6 +3040,64 @@ impl BaseWebRTCSink {
_ => None, _ => None,
}; };
webrtcbin.connect_closure(
"deep-element-added",
false,
glib::closure!(
#[strong]
session_id,
#[watch]
element,
move |_webrtcbin: gst::Element, _bin: gst::Bin, e: gst::Element| {
if e.factory().map_or(false, |f| f.name() == "nicesink") {
let sinkpad = e.static_pad("sink").unwrap();
let session_id = session_id.clone();
let element_clone = element.downgrade();
sinkpad.add_probe(
gst::PadProbeType::BUFFER
| gst::PadProbeType::BUFFER_LIST
| gst::PadProbeType::EVENT_DOWNSTREAM,
move |_pad, info| {
let Some(element) = element_clone.upgrade() else {
return gst::PadProbeReturn::Remove;
};
let this = element.imp();
let settings = this.settings.lock().unwrap();
if settings.forward_metas.is_empty() {
return gst::PadProbeReturn::Ok;
}
let state = this.state.lock().unwrap();
match info.data {
Some(gst::PadProbeData::Buffer(ref buffer)) => {
this.send_buffer_metas(
&state,
&settings,
&session_id,
buffer,
);
}
Some(gst::PadProbeData::BufferList(ref list)) => {
for buffer in list.iter() {
this.send_buffer_metas(
&state,
&settings,
&session_id,
buffer,
);
}
}
_ => (),
}
gst::PadProbeReturn::Ok
},
);
}
}
),
);
pipeline.add(&webrtcbin).unwrap(); pipeline.add(&webrtcbin).unwrap();
webrtcbin.connect_closure( webrtcbin.connect_closure(
@ -3560,6 +3662,11 @@ impl BaseWebRTCSink {
.entry(session_id.clone()) .entry(session_id.clone())
.or_default() .or_default()
.insert(mid.to_string(), stream_name.clone()); .insert(mid.to_string(), stream_name.clone());
state
.session_stream_names
.entry(session_id.clone())
.or_default()
.insert(stream_name.clone(), mid.to_string());
} }
if let Some(producer) = state if let Some(producer) = state
@ -4239,11 +4346,27 @@ impl BaseWebRTCSink {
)); ));
} }
fn dye_buffer(&self, mut buffer: gst::Buffer, stream_name: &str) -> Result<gst::Buffer, Error> {
let buf_mut = buffer.make_mut();
let mut m = gst::meta::CustomMeta::add(buf_mut, "webrtcsink-dye")?;
m.mut_structure().set("stream-name", stream_name);
Ok(buffer)
}
fn chain( fn chain(
&self, &self,
pad: &gst::GhostPad, pad: &gst::GhostPad,
buffer: gst::Buffer, mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
buffer = self
.dye_buffer(buffer, pad.name().as_str())
.map_err(|err| {
gst::error!(CAT, obj = pad, "Failed to dye buffer: {err}");
gst::FlowError::Error
})?;
self.start_stream_discovery_if_needed(pad.name().as_str()); self.start_stream_discovery_if_needed(pad.name().as_str());
self.feed_discoveries(pad.name().as_str(), &buffer); self.feed_discoveries(pad.name().as_str(), &buffer);
@ -4259,8 +4382,30 @@ impl ObjectSubclass for BaseWebRTCSink {
type Interfaces = (gst::ChildProxy, gst_video::Navigation); type Interfaces = (gst::ChildProxy, gst_video::Navigation);
} }
fn register_dye_meta() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::meta::CustomMeta::register_with_transform(
"webrtcsink-dye",
&[],
move |outbuf, meta, _inbuf, _type| {
let stream_name = meta.structure().get::<String>("stream-name").unwrap();
if let Ok(mut m) = gst::meta::CustomMeta::add(outbuf, "webrtcsink-dye") {
m.mut_structure().set("stream-name", stream_name);
true
} else {
false
}
},
);
});
}
unsafe impl<T: BaseWebRTCSinkImpl> IsSubclassable<T> for super::BaseWebRTCSink { unsafe impl<T: BaseWebRTCSinkImpl> IsSubclassable<T> for super::BaseWebRTCSink {
fn class_init(class: &mut glib::Class<Self>) { fn class_init(class: &mut glib::Class<Self>) {
register_dye_meta();
Self::parent_class_init::<T>(class); Self::parent_class_init::<T>(class);
} }
} }
@ -4484,6 +4629,24 @@ impl ObjectImpl for BaseWebRTCSink {
.blurb("Address the web server should listen on") .blurb("Address the web server should listen on")
.default_value(DEFAULT_WEB_SERVER_HOST_ADDR) .default_value(DEFAULT_WEB_SERVER_HOST_ADDR)
.build(), .build(),
/**
* GstBaseWebRTCSink:forward-metas:
*
* Comma-separated list of buffer metas to forward over the
* control data channel, if any.
*
* Currently supported names are:
*
* - timecode
*
* Since: plugins-rs-0.14.0
*/
glib::ParamSpecString::builder("forward-metas")
.nick("Forward metas")
.blurb("Comma-separated list of buffer meta names to forward over the control data channel. Currently supported names are: timecode")
.default_value(DEFAULT_FORWARD_METAS)
.mutable_playing()
.build(),
] ]
}); });
@ -4616,6 +4779,15 @@ impl ObjectImpl for BaseWebRTCSink {
}; };
settings.web_server_host_addr = host_addr; settings.web_server_host_addr = host_addr;
} }
"forward-metas" => {
let mut settings = self.settings.lock().unwrap();
settings.forward_metas = value
.get::<String>()
.expect("type checked upstream")
.split(",")
.map(String::from)
.collect();
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -4714,6 +4886,10 @@ impl ObjectImpl for BaseWebRTCSink {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
settings.web_server_host_addr.to_string().to_value() settings.web_server_host_addr.to_string().to_value()
} }
"forward-metas" => {
let settings = self.settings.lock().unwrap();
settings.forward_metas.iter().join(",").to_value()
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }