From 6a23ae168f72ba4030bd208b8259bba997939ab2 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Fri, 23 Aug 2024 17:56:16 +0200 Subject: [PATCH] webrtcsink: implement mechanism to forward metas over control channel It may be desirable for the frontend to receive ancillary information over the control channel. Such information includes but is not limited to time code metas, support for other metas (eg custom meta) might be implemented in the future, as well as downstream events. This patch implements a new info message, probes buffers that arrive at nicesink to look up timecode metas and potentially forwards them to the consumer when the `forward-metas` property is set appropriately. Internally, a "dye" meta is used to trace the media identifier the packet we are about to send over relates to, as rtpfunnel bundles all packets together. The example frontend code also gets a minor update and now logs info messages to the console. Part-of: --- docs/plugins/gst_plugins_cache.json | 12 ++ net/webrtc/README.md | 42 +++++ net/webrtc/gstwebrtc-api/index.html | 3 + .../gstwebrtc-api/src/remote-controller.js | 21 +++ net/webrtc/src/utils.rs | 123 ++++++++++++ net/webrtc/src/webrtcsink/imp.rs | 178 +++++++++++++++++- 6 files changed, 378 insertions(+), 1 deletion(-) diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 0fe6810a..175a9b0d 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -10496,6 +10496,18 @@ "type": "gboolean", "writable": true }, + "forward-metas": { + "blurb": "Comma-separated list of buffer meta names to forward over the control data channel. Currently supported names are: timecode", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "", + "mutable": "playing", + "readable": true, + "type": "gchararray", + "writable": true + }, "ice-transport-policy": { "blurb": "The policy to apply for ICE transport", "conditionally-available": false, diff --git a/net/webrtc/README.md b/net/webrtc/README.md index 1bc40045..f27e987c 100644 --- a/net/webrtc/README.md +++ b/net/webrtc/README.md @@ -257,6 +257,48 @@ Code in [gstwebrtc-api](gstwebrtc-api) is also licensed under the [Mozilla Public License Version 2.0]: http://opensource.org/licenses/MPL-2.0 +## Control data channel + +When the control data channel is enabled using the `enable-control-data-channel` +property, `webrtcsink` can optionally forward serialized metas and events to the +consumer. + +Currently only timecode metas are supported, but other metas and events can easily +be added in a backward-compatible manner. + +Here is an example for forwarding timecodes: + +``` +gst-launch-1.0 videotestsrc ! timecodestamper ! webrtcsink forward-metas="timecode" name=ws enable-control-data-channel=true run-signalling-server=true run-web-server=true +``` + +Point your web browser to `http://127.0.0.1:8080`, open the console and click +on the producer ID, you should see such messages in the console once the video +is displayed: + +``` +{ + "type": "InfoMessage", + "mid": "video0", + "info": { + "meta": { + "type": "timeCode", + "hours": 0, + "minutes": 0, + "seconds": 5, + "frames": 17, + "field_count": 0, + "fps": [ + 30, + 1 + ], + "flags": "none", + "latest_daily_jam": "2024-08-23T17:51:42+0200" + } + } +} +``` + ## Using the AWS KVS signaller * Setup AWS Kinesis Video Streams diff --git a/net/webrtc/gstwebrtc-api/index.html b/net/webrtc/gstwebrtc-api/index.html index 858696c9..2c3ddf12 100644 --- a/net/webrtc/gstwebrtc-api/index.html +++ b/net/webrtc/gstwebrtc-api/index.html @@ -418,6 +418,9 @@ entryElement.classList.add("has-remote-control"); submitRequestButtonElement.disabled = false; remoteController.attachVideoElement(videoElement); + remoteController.addEventListener("info", (e) => { + console.log("Received info message from producer: ", e.detail); + }); } else { entryElement.classList.remove("has-remote-control"); submitRequestButtonElement.disabled = true; diff --git a/net/webrtc/gstwebrtc-api/src/remote-controller.js b/net/webrtc/gstwebrtc-api/src/remote-controller.js index a32fde80..6af8c484 100644 --- a/net/webrtc/gstwebrtc-api/src/remote-controller.js +++ b/net/webrtc/gstwebrtc-api/src/remote-controller.js @@ -64,6 +64,23 @@ function getModifiers(event) { return modifiers.join("+"); } +/** + * Event name: "info".
+ * Triggered when a remote peer sends an information message over the control data channel. + * @event GstWebRTCAPI#InfoEvent + * @type {external:CustomEvent} + * @property {object} detail - The info message + * @see GstWebRTCAPI.RemoteController + */ +/** + * Event name: "controlResponse".
+ * Triggered when a remote peer sends a response after a control request. + * @event GstWebRTCAPI#ControlResponseEvent + * @type {external:CustomEvent} + * @property {object} detail - The response message + * @see GstWebRTCAPI.RemoteController + */ + /** * @class GstWebRTCAPI.RemoteController * @hideconstructor @@ -78,6 +95,8 @@ function getModifiers(event) { * @extends {external:EventTarget} * @fires {@link GstWebRTCAPI#event:ErrorEvent} * @fires {@link GstWebRTCAPI#event:ClosedEvent} + * @fires {@link GstWebRTCAPI#event:InfoEvent} + * @fires {@link GstWebRTCAPI#event:ControlResponseEvent} * @see GstWebRTCAPI.ConsumerSession#remoteController * @see GstWebRTCAPI.RemoteController#attachVideoElement * @see https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/tree/main/net/webrtc/gstwebrtc-api#produce-a-gstreamer-interactive-webrtc-stream-with-remote-control @@ -117,6 +136,8 @@ export default class RemoteController extends EventTarget { if (msg.type === "ControlResponseMessage") { this.dispatchEvent(new CustomEvent("controlResponse", { detail: msg })); + } else if (msg.type === "InfoMessage") { + this.dispatchEvent(new CustomEvent("info", { detail: msg })); } } catch (ex) { this.dispatchEvent(new ErrorEvent("error", { diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs index a04350d6..0bd0fae9 100644 --- a/net/webrtc/src/utils.rs +++ b/net/webrtc/src/utils.rs @@ -1213,6 +1213,92 @@ pub fn deserialize_serde_object( Ok(ret.build()) } +#[derive(Debug, PartialEq, Eq)] +pub struct VideoTimeCodeFlags(gst_video::VideoTimeCodeFlags); + +impl serde::Serialize for VideoTimeCodeFlags { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + if self.0.bits() == 0 { + serializer.serialize_str("none") + } else { + let mut ret = String::new(); + if self.0.contains(gst_video::VideoTimeCodeFlags::DROP_FRAME) { + ret.push_str("drop-frame"); + if self.0.contains(gst_video::VideoTimeCodeFlags::INTERLACED) { + ret.push_str("+interlaced"); + } + } else if self.0.contains(gst_video::VideoTimeCodeFlags::INTERLACED) { + ret.push_str("interlaced"); + } + serializer.serialize_str(&ret) + } + } +} + +#[derive(Serialize, Debug, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "type")] +pub enum Meta { + TimeCode { + hours: u32, + minutes: u32, + seconds: u32, + frames: u32, + field_count: u32, + fps: gst::Fraction, + flags: VideoTimeCodeFlags, + latest_daily_jam: Option, + }, +} + +#[derive(Serialize, Debug, PartialEq)] +#[serde(rename_all = "camelCase")] +pub enum Info { + Meta(Meta), +} + +#[derive(Serialize, Debug, PartialEq)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "type")] +pub struct InfoMessage { + pub mid: String, + pub info: Info, +} + +pub fn serialize_meta(buffer: &gst::BufferRef, forward_metas: &HashSet) -> Vec { + let mut ret = vec![]; + + buffer.foreach_meta(|meta| { + if forward_metas.contains("timecode") { + if let Some(tc_meta) = meta.downcast_ref::() { + let tc = tc_meta.tc(); + ret.push(Meta::TimeCode { + hours: tc.hours(), + minutes: tc.minutes(), + seconds: tc.seconds(), + frames: tc.frames(), + field_count: tc.field_count(), + fps: tc.fps(), + flags: VideoTimeCodeFlags(tc.flags()), + latest_daily_jam: tc + .latest_daily_jam() + .and_then(|dt| { + let gst_dt: gst::DateTime = dt.into(); + gst_dt.to_iso8601_string().ok() + }) + .map(|s| s.to_string()), + }); + } + } + std::ops::ControlFlow::Continue(()) + }); + + ret +} + #[cfg(test)] mod tests { use super::*; @@ -1278,6 +1364,43 @@ mod tests { Ok(()) } + #[test] + fn test_serialize_meta() -> Result<(), String> { + gst::init().unwrap(); + + let mut buffer = gst::Buffer::new(); + let time_code = gst_video::VideoTimeCode::new( + gst::Fraction::new(30, 1), + None, + gst_video::VideoTimeCodeFlags::empty(), + 10, + 53, + 17, + 0, + 0, + ); + gst_video::VideoTimeCodeMeta::add( + buffer.get_mut().unwrap(), + &time_code.try_into().unwrap(), + ); + + assert_eq!( + serialize_meta(&buffer, &[String::from("timecode")].into()), + vec![Meta::TimeCode { + hours: 10, + minutes: 53, + seconds: 17, + frames: 0, + field_count: 0, + fps: gst::Fraction::new(30, 1), + flags: VideoTimeCodeFlags(gst_video::VideoTimeCodeFlags::empty()), + latest_daily_jam: None, + }] + ); + + Ok(()) + } + fn test_find_smallest_available_ext_id_case( ids: impl IntoIterator, expected: u32, diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index e11a6d40..83e2d719 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -85,6 +85,7 @@ const DEFAULT_WEB_SERVER_PATH: Option<&str> = None; const DEFAULT_WEB_SERVER_DIRECTORY: &str = "gstwebrtc-api/dist"; #[cfg(feature = "web_server")] const DEFAULT_WEB_SERVER_HOST_ADDR: &str = "http://127.0.0.1:8080"; +const DEFAULT_FORWARD_METAS: &str = ""; /* Start adding some FEC when the bitrate > 2Mbps as we found experimentally * that it is not worth it below that threshold */ #[cfg(feature = "v1_22")] @@ -126,6 +127,7 @@ struct Settings { web_server_directory: String, #[cfg(feature = "web_server")] web_server_host_addr: url::Url, + forward_metas: HashSet, } #[derive(Debug, Clone)] @@ -512,6 +514,7 @@ struct State { #[cfg(feature = "web_server")] web_join_handle: Option>, session_mids: HashMap>, + session_stream_names: HashMap>, } fn create_navigation_event(sink: &super::BaseWebRTCSink, msg: &str, session_id: &str) { @@ -687,6 +690,7 @@ impl Default for Settings { web_server_directory: String::from(DEFAULT_WEB_SERVER_DIRECTORY), #[cfg(feature = "web_server")] web_server_host_addr: url::Url::parse(DEFAULT_WEB_SERVER_HOST_ADDR).unwrap(), + forward_metas: HashSet::new(), } } } @@ -713,6 +717,7 @@ impl Default for State { #[cfg(feature = "web_server")] web_join_handle: None, session_mids: HashMap::new(), + session_stream_names: HashMap::new(), } } } @@ -2859,6 +2864,45 @@ impl BaseWebRTCSink { signaller.add_ice(&session_id, &candidate, sdp_m_line_index, None) } + fn send_buffer_metas( + &self, + state: &State, + settings: &Settings, + session_id: &str, + buffer: &gst::BufferRef, + ) { + // This should probably never happen as the transform + // function for the dye meta always copies, but we can't + // rely on the behavior of all encoders / payloaders. + let Ok(dye_meta) = gst::meta::CustomMeta::from_buffer(buffer, "webrtcsink-dye") else { + return; + }; + let stream_name = dye_meta.structure().get::("stream-name").unwrap(); + let Some(mid) = state + .session_stream_names + .get(session_id) + .and_then(|names| names.get(&stream_name)) + else { + return; + }; + + if let Some(ref handler) = state.control_events_handler { + for meta in utils::serialize_meta(buffer, &settings.forward_metas) { + match serde_json::to_string(&utils::InfoMessage { + mid: mid.to_owned(), + info: utils::Info::Meta(meta), + }) { + Ok(msg) => { + handler.0 .1.send_string(Some(msg.as_str())); + } + Err(err) => { + gst::warning!(CAT, imp = self, "Failed to serialize info message: {err:?}",); + } + } + } + } + } + /// Called by the signaller to add a new session fn start_session( &self, @@ -2996,6 +3040,64 @@ impl BaseWebRTCSink { _ => None, }; + webrtcbin.connect_closure( + "deep-element-added", + false, + glib::closure!( + #[strong] + session_id, + #[watch] + element, + move |_webrtcbin: gst::Element, _bin: gst::Bin, e: gst::Element| { + if e.factory().map_or(false, |f| f.name() == "nicesink") { + let sinkpad = e.static_pad("sink").unwrap(); + + let session_id = session_id.clone(); + let element_clone = element.downgrade(); + sinkpad.add_probe( + gst::PadProbeType::BUFFER + | gst::PadProbeType::BUFFER_LIST + | gst::PadProbeType::EVENT_DOWNSTREAM, + move |_pad, info| { + let Some(element) = element_clone.upgrade() else { + return gst::PadProbeReturn::Remove; + }; + let this = element.imp(); + let settings = this.settings.lock().unwrap(); + if settings.forward_metas.is_empty() { + return gst::PadProbeReturn::Ok; + } + + let state = this.state.lock().unwrap(); + match info.data { + Some(gst::PadProbeData::Buffer(ref buffer)) => { + this.send_buffer_metas( + &state, + &settings, + &session_id, + buffer, + ); + } + Some(gst::PadProbeData::BufferList(ref list)) => { + for buffer in list.iter() { + this.send_buffer_metas( + &state, + &settings, + &session_id, + buffer, + ); + } + } + _ => (), + } + gst::PadProbeReturn::Ok + }, + ); + } + } + ), + ); + pipeline.add(&webrtcbin).unwrap(); webrtcbin.connect_closure( @@ -3560,6 +3662,11 @@ impl BaseWebRTCSink { .entry(session_id.clone()) .or_default() .insert(mid.to_string(), stream_name.clone()); + state + .session_stream_names + .entry(session_id.clone()) + .or_default() + .insert(stream_name.clone(), mid.to_string()); } if let Some(producer) = state @@ -4239,11 +4346,27 @@ impl BaseWebRTCSink { )); } + fn dye_buffer(&self, mut buffer: gst::Buffer, stream_name: &str) -> Result { + let buf_mut = buffer.make_mut(); + + let mut m = gst::meta::CustomMeta::add(buf_mut, "webrtcsink-dye")?; + + m.mut_structure().set("stream-name", stream_name); + + Ok(buffer) + } + fn chain( &self, pad: &gst::GhostPad, - buffer: gst::Buffer, + mut buffer: gst::Buffer, ) -> Result { + buffer = self + .dye_buffer(buffer, pad.name().as_str()) + .map_err(|err| { + gst::error!(CAT, obj = pad, "Failed to dye buffer: {err}"); + gst::FlowError::Error + })?; self.start_stream_discovery_if_needed(pad.name().as_str()); self.feed_discoveries(pad.name().as_str(), &buffer); @@ -4259,8 +4382,30 @@ impl ObjectSubclass for BaseWebRTCSink { type Interfaces = (gst::ChildProxy, gst_video::Navigation); } +fn register_dye_meta() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::meta::CustomMeta::register_with_transform( + "webrtcsink-dye", + &[], + move |outbuf, meta, _inbuf, _type| { + let stream_name = meta.structure().get::("stream-name").unwrap(); + if let Ok(mut m) = gst::meta::CustomMeta::add(outbuf, "webrtcsink-dye") { + m.mut_structure().set("stream-name", stream_name); + true + } else { + false + } + }, + ); + }); +} + unsafe impl IsSubclassable for super::BaseWebRTCSink { fn class_init(class: &mut glib::Class) { + register_dye_meta(); Self::parent_class_init::(class); } } @@ -4484,6 +4629,24 @@ impl ObjectImpl for BaseWebRTCSink { .blurb("Address the web server should listen on") .default_value(DEFAULT_WEB_SERVER_HOST_ADDR) .build(), + /** + * GstBaseWebRTCSink:forward-metas: + * + * Comma-separated list of buffer metas to forward over the + * control data channel, if any. + * + * Currently supported names are: + * + * - timecode + * + * Since: plugins-rs-0.14.0 + */ + glib::ParamSpecString::builder("forward-metas") + .nick("Forward metas") + .blurb("Comma-separated list of buffer meta names to forward over the control data channel. Currently supported names are: timecode") + .default_value(DEFAULT_FORWARD_METAS) + .mutable_playing() + .build(), ] }); @@ -4616,6 +4779,15 @@ impl ObjectImpl for BaseWebRTCSink { }; settings.web_server_host_addr = host_addr; } + "forward-metas" => { + let mut settings = self.settings.lock().unwrap(); + settings.forward_metas = value + .get::() + .expect("type checked upstream") + .split(",") + .map(String::from) + .collect(); + } _ => unimplemented!(), } } @@ -4714,6 +4886,10 @@ impl ObjectImpl for BaseWebRTCSink { let settings = self.settings.lock().unwrap(); settings.web_server_host_addr.to_string().to_value() } + "forward-metas" => { + let settings = self.settings.lock().unwrap(); + settings.forward_metas.iter().join(",").to_value() + } _ => unimplemented!(), } }